Sha256: d97a12beab7c650dc73eabaeeceafb20bb2c4bf0c4d96115df483dc35aff3b73

Contents?: true

Size: 1.44 KB

Versions: 1

Compression:

Stored size: 1.44 KB

Contents

module Kafka
  class Instrumenter
    NAMESPACE = "kafka"

    def initialize(default_payload = {})
      @default_payload = default_payload

      if defined?(ActiveSupport::Notifications)
        @backend = ActiveSupport::Notifications.instrumenter
      else
        @backend = nil
      end
    end

    def instrument(event_name, payload = {})
      if @backend
        payload.update(@default_payload)

        @backend.instrument("#{event_name}.#{NAMESPACE}", payload) { yield payload if block_given? }
      else
        yield payload if block_given?
      end
    end

    def start(event_name, payload = {})
      if @backend
        payload.update(@default_payload)

        @backend.start("#{event_name}.#{NAMESPACE}", payload)
      end
    end

    def finish(event_name, payload = {})
      if @backend
        payload.update(@default_payload)

        @backend.finish("#{event_name}.#{NAMESPACE}", payload)
      end
    end
  end

  class DecoratingInstrumenter
    def initialize(backend, extra_payload = {})
      @backend = backend
      @extra_payload = extra_payload
    end

    def instrument(event_name, payload = {}, &block)
      @backend.instrument(event_name, @extra_payload.merge(payload), &block)
    end

    def start(event_name, payload = {})
      @backend.start(event_name, @extra_payload.merge(payload))
    end

    def finish(event_name, payload = {})
      @backend.finish(event_name, @extra_payload.merge(payload))
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
ruby-kafka-0.3.18.beta1 lib/kafka/instrumenter.rb