module Fluent
  class PingPortInput < Fluent::Input
    Plugin.register_input 'ping_port', self

    def initialize
      super
      require 'socket'
      require 'timeout'
    end

    config_param :tag, :string
    config_param :host, :string
    config_param :port, :string
    config_param :timeout, :integer, default: 1
    config_param :retry_count, :integer, default: 3
    config_param :interval, :time, default: '5m'

    def configure(conf)
      super
      @ports = @port.split(',')
      @state = @ports.inject({}) {|state, port|
        state[port] = 0
        state
      }
    end

    def start
      @thread = Thread.new(&method(:run))
    end

    def shutdown
      Thread.kill(@thread)
    end

    def run
      loop do
        Thread.new(&method(:emit_ping_port))
        sleep @interval
      end
    end

    def emit_ping_port
      begin
        @ports.each do |port|
          unless is_port_open?(@host, port, @timeout)
            @state[port] = @state[port] + 1
            if @state[port] >= @retry_count
              record = {
                'message' => "#{@host}:#{port} Connect Error."
              }
              Fluent::Engine.emit @tag, Fluent::Engine.now, record
              @state[port] = 0
            end
          else
            @state[port] = 0
          end
        end
      rescue => e
        log.error e
      end
    end

    private

    def is_port_open?(host, port, timeout)
      begin
        Timeout::timeout(timeout) do
          begin
            s = TCPSocket.new(host, port)
            s.close
            return true
          rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH
            return false
          end
        end
      rescue Timeout::Error
      end
      return false
    end    
  end
end