Sha256: 3c5f39fa69aacdb4ecadf9565a4f748c0d22a6aa97e8e92cb97086481cb40a5a

Contents?: true

Size: 1.88 KB

Versions: 5

Compression:

Stored size: 1.88 KB

Contents

# frozen_string_literal: true

module WaterDrop
  module Instrumentation
    module Callbacks
      # Callback that is triggered when oauth token needs to be refreshed.
      class OauthbearerTokenRefresh
        # @param bearer [Rdkafka::Producer] given rdkafka instance. It is needed as
        #   we need to have a reference to call `#oauthbearer_set_token` or
        #   `#oauthbearer_set_token_failure` upon the event.
        # @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using
        def initialize(bearer, monitor)
          @bearer = bearer
          @monitor = monitor
        end

        # Upon receiving of this event, user is required to invoke either `#oauthbearer_set_token`
        # or `#oauthbearer_set_token_failure` on the `event[:bearer]` depending whether token
        # obtaining was successful or not.
        #
        # Please refer to WaterDrop and Karafka documentation or `Rdkafka::Helpers::OAuth`
        # documentation directly for exact parameters of those methods.
        #
        # @param _rd_config [Rdkafka::Config]
        # @param bearer_name [String] name of the bearer for which we refresh
        def call(_rd_config, bearer_name)
          return unless @bearer.name == bearer_name

          @monitor.instrument(
            'oauthbearer.token_refresh',
            bearer: @bearer,
            caller: self
          )
        # This runs from the rdkafka thread, thus we want to safe-guard it and prevent absolute
        # crashes even if the instrumentation code fails. If it would bubble-up, it could crash
        # the rdkafka background thread
        rescue StandardError => e
          @monitor.instrument(
            'error.occurred',
            caller: self,
            error: e,
            producer_id: @producer_id,
            type: 'callbacks.oauthbearer_token_refresh.error'
          )
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
waterdrop-2.8.1 lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh.rb
waterdrop-2.8.0 lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh.rb
waterdrop-2.7.4 lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh.rb
waterdrop-2.7.3 lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh.rb
waterdrop-2.7.0 lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh.rb