Sha256: 517be6dc0590fcccf9b7465412949c5412dcc369820ee263b06b5d4875bb7946

Contents?: true

Size: 1.55 KB

Versions: 9

Compression:

Stored size: 1.55 KB

Contents

# frozen_string_literal: true

module Karafka
  module Core
    # Patches to dependencies and components
    module Patches
      # Patches to rdkafka
      module Rdkafka
        # Extends `Rdkafka::Bindings` with some extra methods and updates callbacks that we intend
        # to work with in a bit different way than rdkafka itself
        module Bindings
          class << self
            # Add extra methods that we need
            # @param mod [::Rdkafka::Bindings] rdkafka bindings module
            def included(mod)
              # Default rdkafka setup for errors doest not propagate client details, thus it always
              # publishes all the stuff for all rdkafka instances. We change that by providing
              # function that fetches the instance name, allowing us to have better notifications
              mod.send(:remove_const, :ErrorCallback)
              mod.const_set(:ErrorCallback, build_error_callback)
            end

            # @return [FFI::Function] overwritten callback function
            def build_error_callback
              FFI::Function.new(
                :void, %i[pointer int string pointer]
              ) do |client_prr, err_code, reason, _opaque|
                return nil unless ::Rdkafka::Config.error_callback

                name = ::Rdkafka::Bindings.rd_kafka_name(client_prr)

                error = ::Rdkafka::RdkafkaError.new(err_code, broker_message: reason)

                ::Rdkafka::Config.error_callback.call(name, error)
              end
            end
          end
        end
      end
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
karafka-core-2.2.5 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.2.4 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.2.3 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.2.2 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.2.1 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.2.0 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.1.1 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.1.0 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.1.0.beta1 lib/karafka/core/patches/rdkafka/bindings.rb