# encoding: utf-8
require 'rspec/wait'
require "logstash/devutils/rspec/spec_helper"
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'
require_relative '../../../lib/logstash/inputs/snmp'

describe LogStash::Inputs::Snmp, :ecs_compatibility_support do
  let(:mock_target) { double("org.snmp4j.Target") }
  let(:mock_client) { double("org.logstash.snmp.SnmpClient") }
  let(:mock_aggregator) { double("org.logstash.snmp.SnmpClientRequestAggregator") }
  let(:mock_aggregator_request) { double("org.logstash.snmp.SnmpClientRequestAggregator#Request") }
  let(:config) { {} }

  subject(:plugin) { described_class.new(config) }

  context "an interruptible input plugin" do
    let(:config) {{ "get" => ["1.3.6.1.2.1.1.1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "community" => "public"}] }}

    context "#stop" do
      let(:queue) { SizedQueue.new(20) }

      before(:each) do
        allow(plugin).to receive(:build_client!).and_return(mock_client)
        allow(mock_client).to receive(:listen)
        allow(mock_client).to receive(:create_target).and_return(mock_target)
        expect(mock_client).to receive(:close)

        allow(plugin).to receive(:create_request_aggregator).and_return(mock_aggregator)
        expect(mock_aggregator).to receive(:create_request).and_return(mock_aggregator_request)
        expect(mock_aggregator).to receive(:await).and_return({})
        expect(mock_aggregator).to receive(:close)

        expect(mock_aggregator_request).to receive(:get)
        expect(mock_aggregator_request).to receive(:get_result_async)

        plugin.register
      end

      it "returns from run" do
        Thread.new(queue) { |queue| loop { queue.pop } }

        plugin_thread = Thread.new(plugin, queue) { |subject, queue| subject.run(queue) }
        sleep 0.5
        expect(plugin_thread).to be_alive

        plugin.do_stop
        plugin.do_close
        wait(3).for { plugin_thread }.to_not be_alive
      end
    end
  end

  context "OIDs options validation" do
    valid_configs = [
            {"get" => ["1.3.6.1.2.1.1.1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
            {"get" => [".1.3.6.1.2.1.1.1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
            {"get" => [".1.3.6.1.2.1.1.1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161"}], "oid_root_skip" => 2},
            {"get" => [".1.3.6.1.2.1.1.1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161"}], "oid_path_length" => 2},
            {"get" => ["1.3.6.1.2.1.1.1.0", "1.3.6.1.2.1.1"], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
            {"get" => ["1.3.6.1.2.1.1.1.0", ".1.3.6.1.2.1.1"], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
            {"walk" => ["1.3.6.1.2.1.1.1"], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
            {"tables" => [{"name" => "ltmPoolStatTable", "columns" => ["1.3.6.1.4.1.3375.2.2.5.2.3.1.1", "1.3.6.1.4.1.3375.2.2.5.2.3.1.6"]}], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
    ]

    invalid_configs = [
          {"get" => ["1.3.6.1.2.1.1.1.a"], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
          {"get" => ["test"], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
          {"get" => [], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
          {"get" => "foo", "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
          {"get" => [".1.3.6.1.2.1.1.1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161"}], "oid_path_length" => "a" },
          {"get" => [".1.3.6.1.2.1.1.1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161"}], "oid_path_length" => 2, "oid_root_skip" => 2 },
          {"walk" => "foo", "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
          {"tables" => [{"columns" => ["1.2.3.4", "4.3.2.1"]}], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
    ]

    context "validates get oids" do
      valid_configs.each_with_index do |config, index|
        context "with valid config #{index}" do
          let(:config) { config }
          it 'should not raise' do
            expect{ plugin.register }.not_to raise_error
          end
        end
      end

      invalid_configs.each_with_index do |config, index|
        context "with invalid config #{index}" do
          let(:config) { config }
          it 'should raise' do
            expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
          end
        end
      end
    end
  end

  context "hosts options validation" do
    valid_configs = [
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:localhost/161"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/112345"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "community" => "public"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "tcp:127.0.0.1/112345"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "tcp:127.0.0.1/161", "community" => "public"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "version" => "1"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "version" => "2c"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "version" => "3"}], "security_name" => "v3user"},

          {"get" => ["1.0"], "hosts" => [{"host" => "udp:[::1]/16100"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:[2001:db8:0:1:1:1:1:1]/16100"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:[2001:db8::2:1]/161"}]},
    ]

    invalid_configs = [
          {"get" => ["1.0"], "hosts" => [{"host" => "aaa:127.0.0.1/161"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "tcp.127.0.0.1/161"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "localhost"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "localhost/161"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/aaa"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161"}, {"host" => "udp:127.0.0.1/aaa"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "version" => "2"}]},
          {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "version" => "3a"}]},
          {"get" => ["1.0"], "hosts" => ""},
          {"get" => ["1.0"], "hosts" => []},
          {"get" => ["1.0"] },
    ]

    context "validates hosts" do
      valid_configs.each_with_index do |config, index|
        context "with valid config #{index}" do
          let(:config) { config }
          it 'should not raise' do
            expect{ plugin.register }.not_to raise_error
          end
        end
      end

      invalid_configs.each_with_index do |config, index|
        context "with invalid config #{index}" do
          let(:config) { config }
          it 'should raise' do
            expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
          end
        end
      end
    end
  end

  context "v3_users options validation" do
    valid_configs = [
	    {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "version" => "3"}], "security_name" => "ciscov3", "auth_protocol" => "sha", "auth_pass" => "myshapass", "priv_protocol" => "aes", "priv_pass" => "myprivpass", "security_level" => "authNoPriv"},
	    {"get" => ["1.0"], "hosts" => [{"host" => "udp:[2001:db8:0:1:1:1:1:1]/1610", "version" => "3"}], "security_name" => "dellv3", "auth_protocol" => "md5", "auth_pass" => "myshapass", "priv_protocol" => "3des", "priv_pass" => "myprivpass", "security_level" => "authNoPriv"}
    ]

    invalid_configs = [
	    {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "version" => "3"}], "security_name" => "ciscov3", "auth_protocol" => "badauth", "auth_pass" => "myshapass", "priv_protocol" => "aes", "priv_pass" => "myprivpass", "security_level" => "authNoPriv"},
	    {"get" => ["1.0"], "hosts" => [{"host" => "udp:127.0.0.1/161", "version" => "3"}], "security_name" => "ciscov3", "auth_protocol" => "sha"}
    ]

    context "validates v3_users" do
      valid_configs.each_with_index do |config, index|
        context "with valid config #{index}" do
          let(:config) { config }
          it 'should not raise' do
            expect{ plugin.register }.not_to raise_error
          end
        end
      end

      invalid_configs.each_with_index do |config, index|
        context "with invalid config #{index}" do
          let(:config) { config }
          it 'should raise' do
            expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
          end
        end
      end
    end
  end

  describe 'local_engine_id validation' do
    let(:local_engine_id) { nil }
    let(:config) { super().merge('get' => ["1.0"], 'hosts' => [{'host' => "udp:127.0.0.1/161", "version" => "2c"}], 'local_engine_id' => local_engine_id) }

    context 'with length lower than 5' do
      let(:local_engine_id) { '1234' }

      it 'should raise' do
        error_message = '`local_engine_id` length must be greater or equal than 5'
        expect { plugin.register }.to raise_error(LogStash::ConfigurationError, error_message)
      end
    end

    context 'with valid length' do
      let(:local_engine_id) { '0' * 32 }

      it 'should not raise' do
        expect { plugin.register }.to_not raise_error
      end
    end

    context 'with length greater than 32' do
      let(:local_engine_id) { '0' * 33 }

      it 'should raise' do
        error_message = '`local_engine_id` length must be lower or equal than 32'
        expect { plugin.register }.to raise_error(LogStash::ConfigurationError, error_message)
      end
    end
  end

  ecs_compatibility_matrix(:disabled, :v1, :v8) do |ecs_select|
    let(:queue) { Queue.new }
    let(:run_once_runner) { RunOnceStoppableIntervalRunner.new(plugin) }
    let(:config) { { 'ecs_compatibility' => ecs_select.active_mode } }

    before(:each) do
      allow(plugin).to receive(:stoppable_interval_runner).and_return(run_once_runner)
      allow(plugin).to receive(:build_client!).and_return(mock_client)
      allow(mock_client).to receive(:listen)
      allow(mock_client).to receive(:create_target).and_return(mock_target)
      allow(mock_client).to receive(:close).at_most(:once)

      allow(plugin).to receive(:create_request_aggregator).and_return(mock_aggregator)
      expect(mock_aggregator).to receive(:create_request).and_return(mock_aggregator_request)
      expect(mock_aggregator).to receive(:await)
      allow(mock_aggregator).to receive(:close)
    end

    context 'mocked get' do
      before(:each) do
        expect(mock_aggregator_request).to receive(:get)
        expect(mock_aggregator_request).to receive(:get_result_async) do |consumer|
          consumer.call({ 'foo' => 'bar' })
        end
      end

      context 'should add' do
        let(:config) { super().merge({ 'get' => ["1.3.6.1.2.1.1.1.0"], 'hosts' => [{ 'host' => "udp:127.0.0.1/161", 'community' => "public" }] }) }

        it "@metadata and host fields to event" do
          plugin.register
          plugin.run(queue)
          event = queue.pop

          if ecs_select.active_mode == :disabled
            expect(event.get("[@metadata][host_protocol]")).to eq("udp")
            expect(event.get("[@metadata][host_address]")).to eq("127.0.0.1")
            expect(event.get("[@metadata][host_port]")).to eq("161")
            expect(event.get("[@metadata][host_community]")).to eq("public")
            expect(event.get("host")).to eql("127.0.0.1")
          else
            expect(event.get("[@metadata][input][snmp][host][protocol]")).to eq("udp")
            expect(event.get("[@metadata][input][snmp][host][address]")).to eq("127.0.0.1")
            expect(event.get("[@metadata][input][snmp][host][port]")).to eq('161')
            expect(event.get("[@metadata][input][snmp][host][community]")).to eq("public")
            expect(event.get("host")).to eql('ip' => "127.0.0.1")
          end
        end
      end

      context 'with custom host field (legacy metadata)' do
        let(:config) do
          super().merge({
            'get' => ["1.3.6.1.2.1.1.1.0"],
            'hosts' => [{ 'host' => "udp:127.0.0.1/161", 'community' => "public" }],
            'add_field' => { 'host' => '%{[@metadata][host_protocol]}:%{[@metadata][host_address]}/%{[@metadata][host_port]},%{[@metadata][host_community]}' }
          })
        end

        it "should add field to event" do
          plugin.register
          plugin.run(queue)
          event = queue.pop

          expect(event.get("host")).to eq("udp:127.0.0.1/161,public")
        end
      end if ecs_select.active_mode == :disabled

      context "with custom host field (ECS mode)" do
        let(:config) do
          super().merge({
            'get' => ["1.3.6.1.2.1.1.1.0"],
            'hosts' => [{ 'host' => "tcp:192.168.1.11/1161" }],
            'add_field' => { '[host][formatted]' => '%{[@metadata][input][snmp][host][protocol]}://%{[@metadata][input][snmp][host][address]}:%{[@metadata][input][snmp][host][port]}' }
          })
        end

        it "should add field to event" do
          plugin.register
          plugin.run(queue)
          event = queue.pop

          expect(event.get("host")).to eq('formatted' => "tcp://192.168.1.11:1161")
        end
      end if ecs_select.active_mode != :disabled

      context 'with target configured' do
        let(:config) do
          super().merge({
            'get' => ['1.3.6.1.2.1.1.1.0'],
            'hosts' => [{ 'host' => 'udp:127.0.0.1/161', 'community' => 'public' }],
            'target' => 'snmp_data'
          })
        end

        it 'should target event data' do
          plugin.register
          plugin.run(queue)
          event = queue.pop

          expect( event.include?('foo') ).to be false
          expect( event.get('[snmp_data]') ).to eql 'foo' => 'bar'
        end
      end
    end

    context 'mocked empty request result' do
      let(:config) do
        super().merge({
          'get' => ['1.3.6.1.2.1.1.1.0'],
          'hosts' => [{ 'host' => 'udp:127.0.0.1/161', 'community' => 'public' }]
        })
      end

      let(:logger) { double("Logger").as_null_object }

      before(:each) do
        expect(mock_aggregator_request).to receive(:get)
        expect(mock_aggregator_request).to receive(:get_result_async) do |consumer|
          consumer.call({})
        end

        allow(plugin).to receive(:logger).and_return(logger)
        expect(logger).to receive(:debug?).and_return(true)
        expect(logger).to receive(:debug).with('No SNMP data retrieved', anything)
      end

      it 'generates no events when client returns no response' do
        plugin.register
        plugin.poll_clients(queue)

        expect(queue.size).to eql 0
      end
    end

    context 'mocked no request response' do
      let(:config) do
        super().merge({
            'walk' => ["1.3.6.1.2.1.1"],
            "hosts" => [{"host" => "udp:127.0.0.1/161", "community" => "public"}]
        })
      end

      let(:logger) { double("Logger").as_null_object }

      before do
        expect(mock_aggregator_request).to receive(:walk)
        expect(mock_aggregator_request).to receive(:get_result_async)
      end

      it 'generates no events when client returns no response' do
        plugin.register
        plugin.poll_clients(queue)

        expect(queue.size).to eql 0
      end
    end
  end

  context "StoppableIntervalRunner" do
    let(:stop_holder) { Struct.new(:value).new(false) }

    before(:each) do
      allow(plugin).to receive(:stop?) { stop_holder.value }
    end

    let(:plugin) do
      double("Plugin").tap do |dbl|
        allow(dbl).to receive(:logger).and_return(double("Logger").as_null_object)
        allow(dbl).to receive(:stop?) { stop_holder.value }
      end
    end

    subject(:interval_runner) { LogStash::Inputs::Snmp::StoppableIntervalRunner.new(plugin) }

    context "#every" do
      context "when the plugin is stopped" do
        let(:interval_seconds) { 2 }
        it 'does not yield the block' do
          stop_holder.value = true
          expect { |yielder| interval_runner.every(interval_seconds, &yielder) }.to_not yield_control
        end
      end

      context "when the yield takes shorter than the interval" do
        let(:duration_seconds) { 1 }
        let(:interval_seconds) { 2 }

        it 'sleeps off the remainder' do
          allow(interval_runner).to receive(:sleep).and_call_original

          interval_runner.every(interval_seconds) do
            Kernel::sleep(duration_seconds) # non-stoppable
            stop_holder.value = true # prevent re-runs
          end

          expect(interval_runner).to have_received(:sleep).with(a_value_within(0.1).of(1))
        end
      end

      context "when the yield takes longer than the interval" do
        let(:duration_seconds) { 2 }
        let(:interval_seconds) { 1 }

        it 'logs a warning to the plugin' do
          allow(interval_runner).to receive(:sleep).and_call_original

          interval_runner.every(interval_seconds) do
            Kernel::sleep(duration_seconds) # non-stoppable
            stop_holder.value = true # prevent re-runs
          end

          expect(interval_runner).to_not have_received(:sleep)

          expect(plugin.logger).to have_received(:warn).with(a_string_including("took longer"), a_hash_including(:interval_seconds => interval_seconds, :duration_seconds => a_value_within(0.1).of(duration_seconds)))
        end
      end

      it 'runs regularly until the plugin is stopped' do
        timestamps = []

        thread = Thread.new do
          interval_runner.every(1) do
            timestamps << Time.now
            Kernel::sleep(Random::rand(0.8))
          end
        end

        Kernel::sleep(5)
        expect(thread).to be_alive

        stop_holder.value = true
        Kernel::sleep(1)

        aggregate_failures do
          expect(thread).to_not be_alive
          expect(timestamps.count).to be_within(1).of(5)

          timestamps.each_cons(2) do |previous, current|
            # ensure each start time is very close to 1s after the previous.
            expect(current - previous).to be_within(0.05).of(1)
          end

          thread.kill if thread.alive?
        end
      end
    end
  end

  context "close" do
    let(:config) do
      {
        'get' => ["1.3.6.1.2.1.1.1.0"],
        'hosts' => [{'host' => "udp:127.0.0.1/161", 'community' => "public"}]
      }
    end

    let(:run_once_runner) { RunOnceStoppableIntervalRunner.new(plugin) }

    before(:each) do
      allow(plugin).to receive(:stoppable_interval_runner).and_return(run_once_runner)
      allow(plugin).to receive(:build_client!).and_return(mock_client)
      allow(mock_client).to receive(:listen)
      allow(mock_client).to receive(:create_target).and_return(mock_target)
      allow(mock_client).to receive(:close)

      allow(plugin).to receive(:create_request_aggregator).and_return(mock_aggregator)
      expect(mock_aggregator).to receive(:await)
      allow(mock_aggregator).to receive(:close)

      expect(mock_aggregator).to receive(:create_request).and_return(mock_aggregator_request)
      expect(mock_aggregator_request).to receive(:get)
      expect(mock_aggregator_request).to receive(:get_result_async)
    end

    it "should call client close method upon termination" do
      plugin.register
      plugin.run(Queue.new)
      plugin.do_close

      expect(mock_client).to have_received(:close).once
      expect(mock_aggregator).to have_received(:close).once
    end
  end
end

class RunOnceStoppableIntervalRunner
  def initialize(plugin)
    @plugin = plugin
  end

  def every(interval_seconds, desc = 'operation', &block)
    yield
  end
end