Sha256: daf3cf1422bcd0e296bcfaecb63881b3c66e070fb7ae11ee10c9b9598928ac51

Contents?: true

Size: 1.61 KB

Versions: 22

Compression:

Stored size: 1.61 KB

Contents

# frozen_string_literal: true

module Karafka
  module Swarm
    # Simple listener for swarm nodes that:
    #   - reports once in a while to make sure that supervisor is aware we do not hang
    #   - makes sure we did not become an orphan and if so, exits
    class LivenessListener
      include Karafka::Core::Helpers::Time
      include Helpers::ConfigImporter.new(
        node: %i[swarm node],
        liveness_interval: %i[internal swarm liveness_interval],
        orphaned_exit_code: %i[internal swarm orphaned_exit_code]
      )

      def initialize
        @last_checked_at = 0
        @mutex = Mutex.new
      end

      # Since there may be many statistics emitted from multiple listeners, we do not want to write
      # statuses that often. Instead we do it only once in a while which should be enough
      #
      # While this may provide a small lag in the orphaned detection, it does not really matter
      # as it will be picked up fast enough.
      # @param _event [Karafka::Core::Monitoring::Event]
      def on_statistics_emitted(_event)
        periodically do
          Kernel.exit!(orphaned_exit_code) if node.orphaned?

          node.healthy
        end
      end

      private

      # Wraps the logic with a mutex
      # @param block [Proc] code we want to run in mutex
      def synchronize(&block)
        @mutex.synchronize(&block)
      end

      # Runs requested code once in a while
      def periodically
        return if monotonic_now - @last_checked_at < liveness_interval

        synchronize do
          @last_checked_at = monotonic_now

          yield
        end
      end
    end
  end
end

Version data entries

22 entries across 22 versions & 1 rubygems

Version Path
karafka-2.4.17 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.16 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.15 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.14 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.13 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.12 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.11 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.10 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.9 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.8 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.7 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.6 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.5 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.4 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.3 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.0 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.0.rc1 lib/karafka/swarm/liveness_listener.rb
karafka-2.3.4 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.0.beta2 lib/karafka/swarm/liveness_listener.rb
karafka-2.4.0.beta1 lib/karafka/swarm/liveness_listener.rb