# A connection to a remote site or supervisor.
# Uses the Task module to handle asyncronous work, but adds
# the concept of a connection that can be connected or disconnected.

require 'rubygems'

module RSMP
  class Proxy
    WRAPPING_DELIMITER = "\f"

    include Logging
    include Distributor
    include Inspect
    include Task

    attr_reader :state, :archive, :connection_info, :sxl, :collector, :ip, :port, :node, :core_version
    def initialize options
      @node = options[:node]
      initialize_logging options
      initialize_distributor
      initialize_task
      setup options
      clear
      @state = :disconnected
      @state_condition = Async::Notification.new
    end


    def now
      node.now
    end

    def disconnect
    end


    # wait for the reader task to complete,
    # which is not expected to happen before the connection is closed
    def wait_for_reader
      @reader.wait if @reader
    end

    # close connection, but keep our main task running so we can reconnect
    def close
      log "Closing connection", level: :warning
      close_stream
      close_socket
      stop_reader
      set_state :disconnected
      distribute_error DisconnectError.new("Connection was closed")

      # stop timer
      # as we're running inside the timer, code after stop_timer() will not be called,
      # unless it's in the ensure block
      stop_timer
    end

    def stop_subtasks
      stop_timer
      stop_reader
      clear
      super
    end

    def stop_timer
      @timer.stop if @timer
    ensure
      @timer = nil
    end

    def stop_reader
      @reader.stop if @reader
    ensure
      @reader = nil
    end

    def close_stream
      @stream.close if @stream
    ensure
      @stream = nil
    end

    def close_socket
      @socket.close if @socket
    ensure
      @socket = nil
    end

    def stop_task
      close
      super
    end

    # change our state
    def set_state state
      return if state == @state
      @state = state
      state_changed
    end

    # the state changed
    # override to to things like notifications
    def state_changed
      @state_condition.signal @state
    end

    # revive after a reconnect
    def revive options
      setup options
    end

    def setup options
      @settings = options[:settings]
      @socket = options[:socket]
      @stream = options[:stream]
      @protocol = options[:protocol]
      @ip = options[:ip]
      @port = options[:port]
      @connection_info = options[:info]
      @sxl = nil
      @site_settings = nil  # can't pick until we know the site id
      if options[:collect]
        @collector = RSMP::Collector.new self, options[:collect]
        @collector.start
      end
    end

    def inspect
      "#<#{self.class.name}:#{self.object_id}, #{inspector(
        :@acknowledgements,:@settings,:@site_settings
        )}>"
    end

    def clock
      @node.clock
    end

    def ready?
      @state == :ready
    end

    def connected?
      @state == :connected || @state == :ready
    end

    def disconnected?
      @state == :disconnected
    end

    def clear
      @awaiting_acknowledgement = {}
      @latest_watchdog_received = nil
      @watchdog_started = false
      @version_determined = false
      @ingoing_acknowledged = {}
      @outgoing_acknowledged = {}
      @latest_watchdog_send_at = nil

      @acknowledgements = {}
      @acknowledgement_condition = Async::Notification.new
    end

    # run an async task that reads from @socket
    def start_reader
      @reader = @task.async do |task|
        task.annotate "reader"
        run_reader
      end
    end

    def run_reader
      @stream ||= Async::IO::Stream.new(@socket)
      @protocol ||= Async::IO::Protocol::Line.new(@stream,WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed
      loop do
        read_line
      end
    rescue Restart
      log "Closing connection", level: :warning
      raise
    rescue Async::Wrapper::Cancelled
      # ignore exceptions raised when a wait is aborted because a task is stopped
    rescue EOFError, Async::Stop
      log "Connection closed", level: :warning
    rescue IOError => e
      log "IOError: #{e}", level: :warning
    rescue Errno::ECONNRESET
      log "Connection reset by peer", level: :warning
    rescue Errno::EPIPE
      log "Broken pipe", level: :warning
    rescue StandardError => e
      distribute_error e, level: :internal
    end

    def read_line
      json = @protocol.read_line
      beginning = Time.now
      message = process_packet json
      duration = Time.now - beginning
      ms = (duration*1000).round(4)
      if duration > 0
        per_second = (1.0 / duration).round
      else
        per_second = Float::INFINITY
      end
      if message
        type = message.type
        m_id = Logger.shorten_message_id(message.m_id)
      else
        type = 'Unknown'
        m_id = nil
      end
      str = [type,m_id,"processed in #{ms}ms, #{per_second}req/s"].compact.join(' ')
      log str, level: :statistics
    end

    def receive_error e, options={}
      @node.receive_error e, options
    end

    def start_watchdog
      log "Starting watchdog with interval #{@site_settings['intervals']['watchdog']} seconds", level: :debug
      @watchdog_started = true
    end

    def stop_watchdog
      log "Stopping watchdog", level: :debug
      @watchdog_started = false
    end

    def with_watchdog_disabled
      was = @watchdog_started
      stop_watchdog if was
      yield
    ensure
      start_watchdog if was
    end


    def start_timer
      return if @timer
      name = "timer"
      interval = @site_settings['intervals']['timer'] || 1
      log "Starting #{name} with interval #{interval} seconds", level: :debug
      @latest_watchdog_received = Clock.now
      @timer = @task.async do |task|
        task.annotate "timer"
        run_timer task, interval
      end
    end

    def run_timer task, interval
      next_time = Time.now.to_f
      loop do
        begin
          now = Clock.now
          timer(now)
        rescue RSMP::Schema::Error => e
          log "Timer: Schema error: #{e}", level: :warning
        rescue EOFError => e
          log "Timer: Connection closed: #{e}", level: :warning
        rescue IOError => e
          log "Timer: IOError", level: :warning
        rescue Errno::ECONNRESET
          log "Timer: Connection reset by peer", level: :warning
        rescue Errno::EPIPE => e
          log "Timer: Broken pipe", level: :warning
        rescue StandardError => e
          distribute_error e, level: :internal
        end
      ensure
        next_time += interval
        duration = next_time - Time.now.to_f
        task.sleep duration
      end
    end

    def timer now
      watchdog_send_timer now
      check_ack_timeout now
      check_watchdog_timeout now
    end

    def watchdog_send_timer now
      return unless @watchdog_started
      return if @site_settings['intervals']['watchdog'] == :never
      if @latest_watchdog_send_at == nil
        send_watchdog now
      else
        # we add half the timer interval to pick the timer
        # event closes to the wanted wathcdog interval
        diff = now - @latest_watchdog_send_at
        if (diff + 0.5*@site_settings['intervals']['timer']) >= (@site_settings['intervals']['watchdog'])
          send_watchdog now
        end
      end
    end

    def send_watchdog now=Clock.now
      message = Watchdog.new( {"wTs" => clock.to_s})
      send_message message
      @latest_watchdog_send_at = now
    end

    def check_ack_timeout now
      timeout = @site_settings['timeouts']['acknowledgement']
      # hash cannot be modify during iteration, so clone it
      @awaiting_acknowledgement.clone.each_pair do |m_id, message|
        latest = message.timestamp + timeout
        if now > latest
          str = "No acknowledgements for #{message.type} #{message.m_id_short} within #{timeout} seconds"
          log str, level: :error
          begin
            close
          ensure
            distribute_error MissingAcknowledgment.new(str)
          end
        end
      end
    end

    def check_watchdog_timeout now
      timeout = @site_settings['timeouts']['watchdog']
      latest = @latest_watchdog_received + timeout
      left = latest - now
      if left < 0
        str = "No Watchdog received within #{timeout} seconds"
        log str, level: :warning
        distribute MissingWatchdog.new(str)
      end
    end

    def log str, options={}
      super str, options.merge(ip: @ip, port: @port, site_id: @site_id)
    end

    def get_schemas
      schemas = { core: RSMP::Schema.latest_core_version } # use latest core
      schemas[:core] = core_version if core_version
      schemas[sxl] = RSMP::Schema.sanitize_version(sxl_version.to_s) if sxl && sxl_version
      schemas
    end

    def send_message message, reason=nil, validate: true
      raise NotReady unless connected?
      raise IOError unless @protocol
      message.direction = :out
      message.generate_json
      message.validate get_schemas unless validate==false
      @protocol.write_lines message.json
      expect_acknowledgement message
      distribute message
      log_send message, reason
    rescue EOFError, IOError
      buffer_message message
    rescue SchemaError, RSMP::Schema::Error => e
      str = "Could not send #{message.type} because schema validation failed: #{e.message}"
      log str, message: message, level: :error
      distribute_error e.exception("#{str} #{message.json}")
    end

    def buffer_message message
      # TODO
      #log "Cannot send #{message.type} because the connection is closed.", message: message, level: :error
    end

    def log_send message, reason=nil
      if reason
        str = "Sent #{message.type} #{reason}"
      else
        str = "Sent #{message.type}"
      end

      if message.type == "MessageNotAck"
        log str, message: message, level: :warning
      else
        log str, message: message, level: :log
      end
    end

    def should_validate_ingoing_message? message
      return true unless @site_settings
      skip = @site_settings.dig('skip_validation')
      return true unless skip
      klass = message.class.name.split('::').last
      !skip.include?(klass)
    end

    def process_deferred
      @node.process_deferred
    end

    def verify_sequence message
      expect_version_message(message) unless @version_determined
    end

    def process_packet json
      attributes = Message.parse_attributes json
      message = Message.build attributes, json
      message.validate(get_schemas) if should_validate_ingoing_message?(message)
      verify_sequence message
      with_deferred_distribution do
        distribute message
        process_message message
      end
      process_deferred
      message
    rescue InvalidPacket => e
      str = "Received invalid package, must be valid JSON but got #{json.size} bytes: #{e.message}"
      distribute_error e.exception(str)
      log str, level: :warning
      nil
    rescue MalformedMessage => e
      str = "Received malformed message, #{e.message}"
      distribute_error e.exception(str)
      log str, message: Malformed.new(attributes), level: :warning
      # cannot send NotAcknowledged for a malformed message since we can't read it, just ignore it
      nil
    rescue SchemaError, RSMP::Schema::Error => e
      reason = "schema errors: #{e.message}"
      str = "Received invalid #{message.type}"
      distribute_error e.exception(str), message: message
      dont_acknowledge message, str, reason
      message
    rescue InvalidMessage => e
      reason = "#{e.message}"
      str = "Received invalid #{message.type},"
      distribute_error e.exception("#{str} #{message.json}"), message: message
      dont_acknowledge message, str, reason
      message
    rescue FatalError => e
      reason = e.message
      str = "Rejected #{message.type},"
      distribute_error e.exception(str), message: message
      dont_acknowledge message, str, reason
      close
      message
    ensure
      @node.clear_deferred
    end

    def process_message message
      case message
        when MessageAck
          process_ack message
        when MessageNotAck
          process_not_ack message
        when Version
          process_version message
        when Watchdog
          process_watchdog message
        else
          dont_acknowledge message, "Received", "unknown message (#{message.type})"
      end
    end

    def will_not_handle message
      reason = "since we're a #{self.class.name.downcase}" unless reason
      log "Ignoring #{message.type}, #{reason}", message: message, level: :warning
      dont_acknowledge message, nil, reason
    end

    def expect_acknowledgement message
      unless message.is_a?(MessageAck) || message.is_a?(MessageNotAck)
        @awaiting_acknowledgement[message.m_id] = message
      end
    end

    def dont_expect_acknowledgement message
      @awaiting_acknowledgement.delete message.attribute("oMId")
    end

    def extraneous_version message
      dont_acknowledge message, "Received", "extraneous Version message"
    end

    def core_versions
      return [RSMP::Schema.latest_core_version] if @site_settings["core_versions"] == 'latest'
      return RSMP::Schema.core_versions if @site_settings["core_versions"] == 'all'
      [@site_settings["core_versions"]].flatten
    end

    def check_core_version message
      versions = core_versions
      # find versions that both we and the client support
      candidates = message.versions & versions
      if candidates.any?
        @core_version = candidates.sort_by { |v| Gem::Version.new(v) }.last  # pick latest version
      else
        raise HandshakeError.new "RSMP versions [#{message.versions.join(',')}] requested, but only [#{versions.join(',')}] supported."
      end
    end

    def process_version message
    end

    def acknowledge original
      raise InvalidArgument unless original
      ack = MessageAck.build_from(original)
      ack.original = original.clone
      send_message ack, "for #{ack.original.type} #{original.m_id_short}"
      check_ingoing_acknowledged original
    end

    def dont_acknowledge original, prefix=nil, reason=nil
      raise InvalidArgument unless original
      str = [prefix,reason].join(' ')
      log str, message: original, level: :warning if reason
      message = MessageNotAck.new({
        "oMId" => original.m_id,
        "rea" => reason || "Unknown reason"
      })
      message.original = original.clone
      send_message message, "for #{original.type} #{original.m_id_short}"
    end

    def wait_for_state state, timeout:
      states = [state].flatten
      return if states.include?(@state)
      wait_for_condition(@state_condition,timeout: timeout) do
        states.include?(@state)
      end
      @state
    rescue RSMP::TimeoutError
      raise RSMP::TimeoutError.new "Did not reach state #{state} within #{timeout}s"
    end

    def send_version site_id, core_versions
      if core_versions=='latest'
        versions = [RSMP::Schema.latest_core_version]
      elsif core_versions=='all'
        versions = RSMP::Schema.core_versions
      else
        versions = [core_versions].flatten
      end
      versions_array = versions.map {|v| {"vers" => v} }

      site_id_array = [site_id].flatten.map {|id| {"sId" => id} }

      version_response = Version.new({
        "RSMP"=>versions_array,
        "siteId"=>site_id_array,
        "SXL"=>sxl_version.to_s
      })
      send_message version_response
    end

    def find_original_for_message message
       @awaiting_acknowledgement[ message.attribute("oMId") ]
    end

    # TODO this might be better handled by a proper event machine using e.g. the EventMachine gem
    def check_outgoing_acknowledged message
      unless @outgoing_acknowledged[message.type]
        @outgoing_acknowledged[message.type] = true
        acknowledged_first_outgoing message
      end
    end

    def check_ingoing_acknowledged message
      unless @ingoing_acknowledged[message.type]
        @ingoing_acknowledged[message.type] = true
        acknowledged_first_ingoing message
      end
    end

    def acknowledged_first_outgoing message
    end

    def acknowledged_first_ingoing message
    end

    def process_ack message
      original = find_original_for_message message
      if original
        dont_expect_acknowledgement message
        message.original = original
        log_acknowledgement_for_original message, original

        case original.type
        when "Version"
          version_acknowledged
        when "StatusSubscribe"
          status_subscribe_acknowledged original
        end

        check_outgoing_acknowledged original

        @acknowledgements[ original.m_id ] = message
        @acknowledgement_condition.signal message
      else
        log_acknowledgement_for_unknown message
      end
    end

    def process_not_ack message
      original = find_original_for_message message
      if original
        dont_expect_acknowledgement message
        message.original = original
        log_acknowledgement_for_original message, original
        @acknowledgements[ original.m_id ] = message
        @acknowledgement_condition.signal message
      else
        log_acknowledgement_for_unknown message
      end
    end

    def log_acknowledgement_for_original message, original
      str = "Received #{message.type} for #{original.type} #{message.attribute("oMId")[0..3]}"
      if message.type == 'MessageNotAck'
        reason = message.attributes["rea"]
        str = "#{str}: #{reason}" if reason
        log str, message: message, level: :warning
      else
        log str, message: message, level: :log
      end
    end

    def log_acknowledgement_for_unknown message
      log "Received #{message.type} for unknown message #{message.attribute("oMId")[0..3]}", message: message, level: :warning
    end

    def process_watchdog message
      log "Received #{message.type}", message: message, level: :log
      @latest_watchdog_received = Clock.now
      acknowledge message
    end

    def expect_version_message message
      unless message.is_a?(Version) || message.is_a?(MessageAck) || message.is_a?(MessageNotAck)
        raise HandshakeError.new "Version must be received first"
      end
    end

    def handshake_complete
      set_state :ready
    end

    def version_acknowledged
    end

    def author
      @node.site_id
    end

    def send_and_optionally_collect message, options, &block
      collect_options = options[:collect] || options[:collect!]
      if collect_options
        task = @task.async do |task|
          task.annotate 'send_and_optionally_collect'
          collector = yield collect_options     # call block to create collector
          collector.collect
          collector.ok! if options[:collect!]   # raise any errors if the bang version was specified
          collector
        end

        send_message message, validate: options[:validate]
        { sent: message, collector: task.wait }
      else
        send_message message, validate: options[:validate]
        return { sent: message }
      end
    end

    def set_nts_message_attributes message
      message.attributes['ntsOId'] = (main && main.ntsOId) ? main.ntsOId : ''
      message.attributes['xNId'] = (main && main.xNId) ? main.xNId : ''
    end

    # Use Gem class to check version requirement
    # Requirement must be a string like '1.1', '>=1.0.3' or '<2.1.4',
    # or list of strings, like ['<=1.4','<1.5']
    def self.version_meets_requirement? version, requirement
      Gem::Requirement.new(requirement).satisfied_by?(Gem::Version.new(version))
    end

    def status_subscribe_acknowledged original
      component = find_component original.attribute('cId')
      return unless component
      short = Message.shorten_m_id original.m_id
      subscribe_list = original.attributes['sS']
      log "StatusSubscribe #{short} acknowledged, allowing repeated status values for #{subscribe_list}", level: :info
      component.allow_repeat_updates subscribe_list
    end
  end
end