Sha256: 53911be229680802a6a865fde889fa658ba7e2e372862f28071164907d233a5e

Contents?: true

Size: 1.71 KB

Versions: 25

Compression:

Stored size: 1.71 KB

Contents

# frozen_string_literal: true

module Karafka
  module Core
    module Monitoring
      # Karafka monitor that can be used to pass through instrumentation calls to selected
      # notifications bus.
      #
      # It provides abstraction layer that allows us to use both our internal notifications as well
      # as `ActiveSupport::Notifications`.
      class Monitor
        # Empty has to save on objects allocation
        EMPTY_HASH = {}.freeze

        private_constant :EMPTY_HASH

        # @param notifications_bus [Object] either our internal notifications bus or
        #   `ActiveSupport::Notifications`
        # @param namespace [String, nil] namespace for events or nil if no namespace
        def initialize(notifications_bus, namespace = nil)
          @notifications_bus = notifications_bus
          @namespace = namespace
          @mapped_events = Concurrent::Map.new
        end

        # Passes the instrumentation block (if any) into the notifications bus
        #
        # @param event_id [String, Symbol] event id
        # @param payload [Hash]
        # @param block [Proc] block we want to instrument (if any)
        def instrument(event_id, payload = EMPTY_HASH, &block)
          full_event_name = @mapped_events[event_id] ||= [event_id, @namespace].compact.join('.')

          @notifications_bus.instrument(full_event_name, payload, &block)
        end

        # Allows us to subscribe to the notification bus
        #
        # @param args [Array] any arguments that the notification bus subscription layer accepts
        # @param block [Proc] optional block for subscription
        def subscribe(*args, &block)
          @notifications_bus.subscribe(*args, &block)
        end
      end
    end
  end
end

Version data entries

25 entries across 25 versions & 1 rubygems

Version Path
karafka-core-2.2.7 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.2.6 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.2.5 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.2.4 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.2.3 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.2.2 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.2.1 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.2.0 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.1.1 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.1.0 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.1.0.beta1 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.0.13 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.0.12 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.0.11 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.0.10 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.0.9 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.0.8 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.0.7 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.0.6 lib/karafka/core/monitoring/monitor.rb
karafka-core-2.0.5 lib/karafka/core/monitoring/monitor.rb