require_relative '../../helper'

require 'fluent/plugin/out_forward/socket_cache'
require 'timecop'

class SocketCacheTest < Test::Unit::TestCase
  sub_test_case 'checkout_or' do
    test 'when given key does not exist' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      sock = mock!.open { 'socket' }.subject
      assert_equal('socket', c.checkout_or('key') { sock.open })
    end

    test 'when given key exists' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      socket = 'socket'
      assert_equal(socket, c.checkout_or('key') { socket })
      c.checkin(socket)

      sock = dont_allow(mock!).open
      assert_equal(socket, c.checkout_or('key') { sock.open })
    end

    test 'when given key exists but used by other' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      assert_equal(@sock, c.checkout_or('key') { @sock })

      new_sock = 'new sock'
      sock = mock!.open { new_sock }.subject
      assert_equal(new_sock, c.checkout_or('key') { sock.open })
    end

    test "when given key's value was expired" do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(0, $log)
      assert_equal(@sock, c.checkout_or('key') { @sock })

      new_sock = 'new sock'
      sock = mock!.open { new_sock }.subject
      assert_equal(new_sock, c.checkout_or('key') { sock.open })
    end

    test 'reuse same hash object after calling purge_obsolete_socks' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      c.checkout_or('key') { 'socket' }
      c.purge_obsolete_socks

      assert_nothing_raised(NoMethodError) do
        c.checkout_or('key') { 'new socket' }
      end
    end
  end

  sub_test_case 'checkin' do
    test 'when value exists' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      socket = 'socket'
      c.checkout_or('key') { socket }
      c.checkin(socket)

      assert_equal(socket, c.instance_variable_get(:@available_sockets)['key'].first.sock)
      assert_equal(1, c.instance_variable_get(:@available_sockets)['key'].size)
      assert_equal(0, c.instance_variable_get(:@inflight_sockets).size)
    end

    test 'when value does not exist' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      c.checkout_or('key') { 'sock' }
      c.checkin('other sock')

      assert_equal(0, c.instance_variable_get(:@available_sockets)['key'].size)
      assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
    end
  end

  test 'revoke' do
    c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
    socket = 'socket'
    c.checkout_or('key') { socket }
    c.revoke(socket)

    assert_equal(1, c.instance_variable_get(:@inactive_sockets).size)
    assert_equal(0, c.instance_variable_get(:@inflight_sockets).size)
    assert_equal(0, c.instance_variable_get(:@available_sockets)['key'].size)

    sock = mock!.open { 1 }.subject
    assert_equal(1, c.checkout_or('key') { sock.open })
  end

  sub_test_case 'clear' do
    test 'when value is in available_sockets' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      m = mock!.close { 'closed' }.subject
      m2 = mock!.close { 'closed' }.subject
      m3 = mock!.close { 'closed' }.subject
      c.checkout_or('key') { m }
      c.revoke(m)
      c.checkout_or('key') { m2 }
      c.checkin(m2)
      c.checkout_or('key2') { m3 }

      assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
      assert_equal(1, c.instance_variable_get(:@available_sockets)['key'].size)
      assert_equal(1, c.instance_variable_get(:@inactive_sockets).size)

      c.clear
      assert_equal(0, c.instance_variable_get(:@inflight_sockets).size)
      assert_equal(0, c.instance_variable_get(:@available_sockets)['key'].size)
      assert_equal(0, c.instance_variable_get(:@inactive_sockets).size)
    end
  end

  sub_test_case 'purge_obsolete_socks' do
    test 'delete key in inactive_socks' do
      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      sock = mock!.close { 'closed' }.subject
      c.checkout_or('key') { sock }
      c.revoke(sock)
      assert_false(c.instance_variable_get(:@inactive_sockets).empty?)

      c.purge_obsolete_socks
      assert_true(c.instance_variable_get(:@inactive_sockets).empty?)
    end

    test 'move key from available_sockets to inactive_sockets' do
      Timecop.freeze(Time.parse('2016-04-13 14:00:00 +0900'))

      c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
      sock = mock!.close { 'closed' }.subject
      sock2 = dont_allow(mock!).close
      stub(sock).inspect
      stub(sock2).inspect

      c.checkout_or('key') { sock }
      c.checkin(sock)

      # wait timeout
      Timecop.freeze(Time.parse('2016-04-13 14:20:00 +0900'))
      c.checkout_or('key') { sock2 }

      assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
      assert_equal(sock2, c.instance_variable_get(:@inflight_sockets).values.first.sock)

      c.purge_obsolete_socks

      assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
      assert_equal(sock2, c.instance_variable_get(:@inflight_sockets).values.first.sock)
    end
  end
end