Sha256: 76a60e4548dbe854cd9cff566caac38fbafc1a4f08c34bd4e9b914bbfa998475

Contents?: true

Size: 1.94 KB

Versions: 17

Compression:

Stored size: 1.94 KB

Contents

# frozen_string_literal: true

require 'socket'

module Karafka
  module Instrumentation
    module Vendors
      # Namespace for instrumentation related with Kubernetes
      module Kubernetes
        # Base Kubernetes Listener providing basic HTTP server capabilities to respond with health
        class BaseListener
          include ::Karafka::Core::Helpers::Time

          # All good with Karafka
          OK_CODE = '204 No Content'

          # Some timeouts, fail
          FAIL_CODE = '500 Internal Server Error'

          private_constant :OK_CODE, :FAIL_CODE

          # @param hostname [String, nil] hostname or nil to bind on all
          # @param port [Integer] TCP port on which we want to run our HTTP status server
          def initialize(
            hostname: nil,
            port: 3000
          )
            @hostname = hostname
            @port = port
          end

          private

          # @return [Boolean] true if all good, false if we should tell k8s to kill this process
          def healthy?
            raise NotImplementedError, 'Implement in a subclass'
          end

          # Responds to a HTTP request with the process liveness status
          def respond
            client = @server.accept
            client.gets
            client.print "HTTP/1.1 #{healthy? ? OK_CODE : FAIL_CODE}\r\n"
            client.print "Content-Type: text/plain\r\n"
            client.print "\r\n"
            client.close

            true
          rescue Errno::ECONNRESET, Errno::EPIPE, IOError
            !@server.closed?
          end

          # Starts background thread with micro-http monitoring
          def start
            @server = TCPServer.new(*[@hostname, @port].compact)

            Thread.new do
              loop do
                break unless respond
              end
            end
          end

          # Stops the server
          def stop
            @server.close
          end
        end
      end
    end
  end
end

Version data entries

17 entries across 17 versions & 1 rubygems

Version Path
karafka-2.4.12 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.11 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.10 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.9 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.8 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.7 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.6 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.5 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.4 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.3 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.0 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.0.rc1 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.3.4 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.0.beta2 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.4.0.beta1 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.3.3 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb
karafka-2.3.2 lib/karafka/instrumentation/vendors/kubernetes/base_listener.rb