# frozen_string_literal: true

require 'socket'

module Karafka
  module Instrumentation
    module Vendors
      # Namespace for instrumentation related with Kubernetes
      module Kubernetes
        # Kubernetes HTTP listener that does not only reply when process is not fully hanging, but
        # also allows to define max time of processing and looping.
        #
        # Processes like Karafka server can hang while still being reachable. For example, in case
        # something would hang inside of the user code, Karafka could stop polling and no new
        # data would be processed, but process itself would still be active. This listener allows
        # for defining of a ttl that gets bumped on each poll loop and before and after processing
        # of a given messages batch.
        class LivenessListener
          include ::Karafka::Core::Helpers::Time

          # All good with Karafka
          OK_CODE = '204 No Content'

          # Some timeouts, fail
          FAIL_CODE = '500 Internal Server Error'

          private_constant :OK_CODE, :FAIL_CODE

          # @param hostname [String, nil] hostname or nil to bind on all
          # @param port [Integer] TCP port on which we want to run our HTTP status server
          # @param consuming_ttl [Integer] time in ms after which we consider consumption hanging.
          #   It allows us to define max consumption time after which k8s should consider given
          #   process as hanging
          # @param polling_ttl [Integer] max time in ms for polling. If polling (any) does not
          #   happen that often, process should be considered dead.
          # @note The default TTL matches the default `max.poll.interval.ms`
          def initialize(
            hostname: nil,
            port: 3000,
            consuming_ttl: 5 * 60 * 1_000,
            polling_ttl: 5 * 60 * 1_000
          )
            @server = TCPServer.new(*[hostname, port].compact)
            @polling_ttl = polling_ttl
            @consuming_ttl = consuming_ttl
            @mutex = Mutex.new
            @pollings = {}
            @consumptions = {}

            Thread.new do
              loop do
                break unless respond
              end
            end
          end

          # Tick on each fetch
          # @param _event [Karafka::Core::Monitoring::Event]
          def on_connection_listener_fetch_loop(_event)
            mark_polling_tick
          end

          # Tick on starting work
          # @param _event [Karafka::Core::Monitoring::Event]
          def on_consumer_consume(_event)
            mark_consumption_tick
          end

          # Tick on finished work
          # @param _event [Karafka::Core::Monitoring::Event]
          def on_consumer_consumed(_event)
            clear_consumption_tick
          end

          # @param _event [Karafka::Core::Monitoring::Event]
          def on_consumer_revoke(_event)
            mark_consumption_tick
          end

          # @param _event [Karafka::Core::Monitoring::Event]
          def on_consumer_revoked(_event)
            clear_consumption_tick
          end

          # @param _event [Karafka::Core::Monitoring::Event]
          def on_consumer_shutting_down(_event)
            mark_consumption_tick
          end

          # @param _event [Karafka::Core::Monitoring::Event]
          def on_consumer_shutdown(_event)
            clear_consumption_tick
          end

          # @param _event [Karafka::Core::Monitoring::Event]
          def on_error_occurred(_event)
            clear_consumption_tick
            clear_polling_tick
          end

          # Stop the http server when we stop the process
          # @param _event [Karafka::Core::Monitoring::Event]
          def on_app_stopped(_event)
            @server.close
          end

          private

          # Wraps the logic with a mutex
          # @param block [Proc] code we want to run in mutex
          def synchronize(&block)
            @mutex.synchronize(&block)
          end

          # @return [Integer] object id of the current thread
          def thread_id
            Thread.current.object_id
          end

          # Update the polling tick time for current thread
          def mark_polling_tick
            synchronize do
              @pollings[thread_id] = monotonic_now
            end
          end

          # Clear current thread polling time tracker
          def clear_polling_tick
            synchronize do
              @pollings.delete(thread_id)
            end
          end

          # Update the processing tick time
          def mark_consumption_tick
            synchronize do
              @consumptions[thread_id] = monotonic_now
            end
          end

          # Clear current thread consumption time tracker
          def clear_consumption_tick
            synchronize do
              @consumptions.delete(thread_id)
            end
          end

          # Responds to a HTTP request with the process liveness status
          def respond
            client = @server.accept
            client.gets
            client.print "HTTP/1.1 #{status}\r\n"
            client.print "Content-Type: text/plain\r\n"
            client.print "\r\n"
            client.close

            true
          rescue Errno::ECONNRESET, Errno::EPIPE, IOError
            !@server.closed?
          end

          # Did we exceed any of the ttls
          # @return [String] 204 string if ok, 500 otherwise
          def status
            time = monotonic_now

            return FAIL_CODE if @pollings.values.any? { |tick| (time - tick) > @polling_ttl }
            return FAIL_CODE if @consumptions.values.any? { |tick| (time - tick) > @consuming_ttl }

            OK_CODE
          end
        end
      end
    end
  end
end