Sha256: faddfd641426dd260661f2cf64d944c8f6a3d659706a6ab2d6c4c93e5678296c

Contents?: true

Size: 1.6 KB

Versions: 6

Compression:

Stored size: 1.6 KB

Contents

# frozen_string_literal: true

module Karafka
  module Core
    # Patches to dependencies and components
    module Patches
      # Patches to rdkafka
      module Rdkafka
        # Extends `Rdkafka::Bindings` with some extra methods and updates callbacks that we intend
        # to work with in a bit different way than rdkafka itself
        module Bindings
          class << self
            # Add extra methods that we need
            # @param mod [::Rdkafka::Bindings] rdkafka bindings module
            def included(mod)
              # Default rdkafka setup for errors doest not propagate client details, thus it always
              # publishes all the stuff for all rdkafka instances. We change that by providing
              # function that fetches the instance name, allowing us to have better notifications
              mod.send(:remove_const, :ErrorCallback)
              mod.const_set(:ErrorCallback, build_error_callback)
            end

            # @return [FFI::Function] overwritten callback function
            def build_error_callback
              FFI::Function.new(
                :void, %i[pointer int string pointer]
              ) do |client_prr, err_code, reason, _opaque|
                return nil unless ::Rdkafka::Config.error_callback

                name = ::Rdkafka::Bindings.rd_kafka_name(client_prr)

                error = ::Rdkafka::RdkafkaError.new(err_code, broker_message: reason)
                error.set_backtrace(caller)

                ::Rdkafka::Config.error_callback.call(name, error)
              end
            end
          end
        end
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
karafka-core-2.4.0.alpha1 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.3.0 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.3.0.rc1 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.3.0.alpha1 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.2.7 lib/karafka/core/patches/rdkafka/bindings.rb
karafka-core-2.2.6 lib/karafka/core/patches/rdkafka/bindings.rb