require 'spec_helper' def wait(n=0.2) sleep n # FIXME: hack end FORK_SUPPORTED = begin Process.wait(fork { true }) true rescue Exception => e false end shared_examples "Instrumental Agent" do context do # Inferred: # secure? and verify_cert? are set # Agent options let(:enabled) { true } let(:synchronous) { false } let(:token) { 'test_token' } let(:agent) { Instrumental::Agent.new(token, :collector => server.host_and_port, :synchronous => synchronous, :enabled => enabled, :secure => secure?, :verify_cert => verify_cert?) } # Server options let(:listen) { true } let(:response) { true } let(:authenticate) { true } let(:server) { TestServer.new(:listen => listen, :authenticate => authenticate, :response => response, :secure => secure?) } before do Instrumental::Agent.logger.level = Logger::UNKNOWN @server = server wait end after do agent.stop server.stop end describe Instrumental::Agent, "disabled" do let(:enabled) { false } it "should not connect to the server" do server.connect_count.should == 0 end it "should not connect to the server after receiving a metric" do agent.gauge('disabled_test', 1) wait server.connect_count.should == 0 end it "should no op on flush without reconnect" do 1.upto(100) { agent.gauge('disabled_test', 1) } agent.flush(false) wait server.commands.should be_empty end it "should no op on flush with reconnect" do 1.upto(100) { agent.gauge('disabled_test', 1) } agent.flush(true) wait server.commands.should be_empty end it "should no op on an empty flush" do agent.flush(true) wait server.commands.should be_empty end it "should send metrics to logger" do now = Time.now agent.logger.should_receive(:debug).with("gauge metric 1 #{now.to_i} 1") agent.gauge("metric", 1) end end describe Instrumental::Agent, "enabled" do it "should not connect to the server" do server.connect_count.should == 0 end it "should connect to the server after sending a metric" do agent.increment("test.foo") wait server.connect_count.should == 1 end it "should announce itself, and include version" do agent.increment("test.foo") wait server.commands[0].should =~ /hello .*/ server.commands[0].should =~ / version / server.commands[0].should =~ / hostname / server.commands[0].should =~ / pid / server.commands[0].should =~ / runtime / server.commands[0].should =~ / platform / end it "should authenticate using the token" do agent.increment("test.foo") wait server.commands[1].should == "authenticate test_token" end it "should report a gauge" do now = Time.now agent.gauge('gauge_test', 123) wait server.commands.last.should == "gauge gauge_test 123 #{now.to_i} 1" end it "should report a time as gauge and return the block result" do now = Time.now agent.time("time_value_test") do 1 + 1 end.should == 2 wait server.commands.last.should =~ /gauge time_value_test .* #{now.to_i}/ end it "should return the value gauged" do now = Time.now agent.gauge('gauge_test', 123).should == 123 agent.gauge('gauge_test', 989).should == 989 wait end it "should report a gauge with a set time" do agent.gauge('gauge_test', 123, 555) wait server.commands.last.should == "gauge gauge_test 123 555 1" end it "should report a gauge with a set time and count" do agent.gauge('gauge_test', 123, 555, 111) wait server.commands.last.should == "gauge gauge_test 123 555 111" end it "should report an increment" do now = Time.now agent.increment("increment_test") wait server.commands.last.should == "increment increment_test 1 #{now.to_i} 1" end it "should return the value incremented by" do now = Time.now agent.increment("increment_test").should == 1 agent.increment("increment_test", 5).should == 5 wait end it "should report an increment a value" do now = Time.now agent.increment("increment_test", 2) wait server.commands.last.should == "increment increment_test 2 #{now.to_i} 1" end it "should report an increment with a set time" do agent.increment('increment_test', 1, 555) wait server.commands.last.should == "increment increment_test 1 555 1" end it "should report an increment with a set time and count" do agent.increment('increment_test', 1, 555, 111) wait server.commands.last.should == "increment increment_test 1 555 111" end it "should discard data that overflows the buffer" do with_constants('Instrumental::Agent::MAX_BUFFER' => 3) do 5.times do |i| agent.increment('overflow_test', i + 1, 300) end wait server.commands.should include("increment overflow_test 1 300 1") server.commands.should include("increment overflow_test 2 300 1") server.commands.should include("increment overflow_test 3 300 1") server.commands.should_not include("increment overflow_test 4 300 1") server.commands.should_not include("increment overflow_test 5 300 1") end end it "should send all data in synchronous mode" do with_constants('Instrumental::Agent::MAX_BUFFER' => 3) do agent.synchronous = true 5.times do |i| agent.increment('overflow_test', i + 1, 300) end agent.instance_variable_get(:@queue).size.should == 0 wait # let the server receive the commands server.commands.should include("increment overflow_test 1 300 1") server.commands.should include("increment overflow_test 2 300 1") server.commands.should include("increment overflow_test 3 300 1") server.commands.should include("increment overflow_test 4 300 1") server.commands.should include("increment overflow_test 5 300 1") end end if FORK_SUPPORTED it "should automatically reconnect when forked" do agent.increment('fork_reconnect_test', 1, 2) fork do agent.increment('fork_reconnect_test', 1, 3) # triggers reconnect end wait(1) agent.increment('fork_reconnect_test', 1, 4) # triggers reconnect wait(1) server.connect_count.should == 2 server.commands.should include("increment fork_reconnect_test 1 2 1") server.commands.should include("increment fork_reconnect_test 1 3 1") server.commands.should include("increment fork_reconnect_test 1 4 1") end end it "should never let an exception reach the user" do agent.stub(:send_command).and_raise(Exception.new("Test Exception")) agent.increment('throws_exception', 2).should be_nil wait agent.gauge('throws_exception', 234).should be_nil wait end it "should let exceptions in time bubble up" do expect { agent.time('za') { raise "fail" } }.to raise_error end it "should return nil if the user overflows the MAX_BUFFER" do 1.upto(Instrumental::Agent::MAX_BUFFER) do agent.increment("test").should == 1 thread = agent.instance_variable_get(:@thread) thread.kill end agent.increment("test").should be_nil end it "should track invalid metrics" do agent.logger.should_receive(:warn).with(/%%/) agent.increment(' %% .!#@$%^&*', 1, 1) wait server.commands.join("\n").should include("increment agent.invalid_metric") end it "should allow reasonable metric names" do agent.increment('a') agent.increment('a.b') agent.increment('hello.world') agent.increment('ThisIsATest.Of.The.Emergency.Broadcast.System.12345') wait server.commands.join("\n").should_not include("increment agent.invalid_metric") end it "should track invalid values" do agent.logger.should_receive(:warn).with(/hello.*testington/) agent.increment('testington', 'hello') wait server.commands.join("\n").should include("increment agent.invalid_value") end it "should allow reasonable values" do agent.increment('a', -333.333) agent.increment('a', -2.2) agent.increment('a', -1) agent.increment('a', 0) agent.increment('a', 1) agent.increment('a', 2.2) agent.increment('a', 333.333) agent.increment('a', Float::EPSILON) wait server.commands.join("\n").should_not include("increment agent.invalid_value") end it "should send notices to the server" do tm = Time.now agent.notice("Test note", tm) wait server.commands.join("\n").should include("notice #{tm.to_i} 0 Test note") end it "should prevent a note w/ newline characters from being sent to the server" do agent.notice("Test note\n").should be_nil wait server.commands.join("\n").should_not include("notice Test note") end it "should allow outgoing metrics to be stopped" do tm = Time.now agent.increment("foo.bar", 1, tm) agent.stop wait agent.increment("foo.baz", 1, tm) wait server.commands.join("\n").should include("increment foo.baz 1 #{tm.to_i}") server.commands.join("\n").should_not include("increment foo.bar 1 #{tm.to_i}") end it "should allow flushing pending values to the server" do 1.upto(100) { agent.gauge('a', rand(50)) } agent.instance_variable_get(:@queue).size.should > 0 agent.flush agent.instance_variable_get(:@queue).size.should == 0 wait server.commands.grep(/^gauge a /).size.should == 100 end it "should no op on an empty flush" do agent.flush(true) wait server.commands.should be_empty end end describe Instrumental::Agent, "connection problems" do it "should automatically reconnect on disconnect" do agent.increment("reconnect_test", 1, 1234) wait server.disconnect_all wait(1) agent.increment('reconnect_test', 1, 5678) # triggers reconnect wait(1) server.connect_count.should == 2 server.commands.last.should == "increment reconnect_test 1 5678 1" end context 'not listening' do let(:listen) { false } it "should buffer commands when server is down" do agent.increment('reconnect_test', 1, 1234) wait agent.queue.pop(true).should include("increment reconnect_test 1 1234 1\n") end it "should warn once when buffer is full" do with_constants('Instrumental::Agent::MAX_BUFFER' => 3) do wait agent.logger.should_receive(:warn).with(/Queue full/).once agent.increment('buffer_full_warn_test', 1, 1234) agent.increment('buffer_full_warn_test', 1, 1234) agent.increment('buffer_full_warn_test', 1, 1234) agent.increment('buffer_full_warn_test', 1, 1234) agent.increment('buffer_full_warn_test', 1, 1234) end end end context 'not responding' do let(:response) { false } it "should buffer commands when server is not responsive" do agent.increment('reconnect_test', 1, 1234) wait agent.queue.pop(true).should include("increment reconnect_test 1 1234 1\n") end end context 'not authenticating' do let(:authenticate) { false } it "should buffer commands when authentication fails" do agent.increment('reconnect_test', 1, 1234) wait agent.queue.pop(true).should include("increment reconnect_test 1 1234 1\n") end end if FORK_SUPPORTED it "should send commands in a short-lived process" do if pid = fork { agent.increment('foo', 1, 1234) } Process.wait(pid) server.commands.last.should == "increment foo 1 1234 1" end end it "should send commands in a process that bypasses at_exit when using #cleanup" do if pid = fork { agent.increment('foo', 1, 1234); agent.cleanup; exit! } Process.wait(pid) server.commands.last.should == "increment foo 1 1234 1" end end it "should not wait longer than EXIT_FLUSH_TIMEOUT seconds to exit a process" do agent.stub(:open_socket) { |*args, &block| sleep(5) && block.call } with_constants('Instrumental::Agent::EXIT_FLUSH_TIMEOUT' => 3) do if (pid = fork { agent.increment('foo', 1) }) tm = Time.now.to_f Process.wait(pid) diff = Time.now.to_f - tm diff.abs.should >= 3 diff.abs.should < 5 end end end it "should not wait to exit a process if there are no commands queued" do agent.stub(:open_socket) { |*args, &block| sleep(5) && block.call } with_constants('Instrumental::Agent::EXIT_FLUSH_TIMEOUT' => 3) do if (pid = fork { agent.increment('foo', 1); agent.queue.clear }) tm = Time.now.to_f Process.wait(pid) diff = Time.now.to_f - tm diff.should < 1 end end end end it "should not wait longer than EXIT_FLUSH_TIMEOUT to attempt flushing the socket when disconnecting" do agent.increment('foo', 1) wait agent.should_receive(:flush_socket) do r, w = IO.pipe Thread.new do IO.select([r]) # mimic an endless blocking select poll end.join end with_constants('Instrumental::Agent::EXIT_FLUSH_TIMEOUT' => 3) do tm = Time.now.to_f agent.cleanup diff = Time.now.to_f - tm diff.should <= 3 end end end describe Instrumental::Agent, "enabled with sync option" do let(:synchronous) { true } it "should send all data in synchronous mode" do with_constants('Instrumental::Agent::MAX_BUFFER' => 3) do 5.times do |i| agent.increment('overflow_test', i + 1, 300) end wait # let the server receive the commands server.commands.should include("increment overflow_test 1 300 1") server.commands.should include("increment overflow_test 2 300 1") server.commands.should include("increment overflow_test 3 300 1") server.commands.should include("increment overflow_test 4 300 1") server.commands.should include("increment overflow_test 5 300 1") end end end end end describe "Insecure" do let(:secure?) { false } let(:verify_cert?) { false } it_behaves_like "Instrumental Agent" end describe "Secure without cert verify" do let(:secure?) { true } let(:verify_cert?) { false } it_behaves_like "Instrumental Agent" it "should be disabled if the system does not allow secure connections but the user specifically requested secure" do Instrumental::Agent.any_instance.stub(:allows_secure?) { false } agent = Instrumental::Agent.new('test-token', :enabled => true, :secure => true) agent.secure.should == false agent.enabled.should == false end it "should be fallback to insecure if the system does not allow secure connections but the user did not specifically request secure" do Instrumental::Agent.any_instance.stub(:allows_secure?) { false } agent = Instrumental::Agent.new('test-token', :enabled => true) agent.secure.should == false agent.enabled.should == true end end