Sha256: 1d4614c9661365ddaee995f875e3032fc22fbb64308e9af928caf7ff9befdad7

Contents?: true

Size: 1.54 KB

Versions: 12

Compression:

Stored size: 1.54 KB

Contents

# frozen_string_literal: true

module WaterDrop
  module Patches
    module Rdkafka
      # Extends `Rdkafka::Bindings` with some extra methods and updates callbacks that we intend
      # to work with in a bit different way than rdkafka itself
      module Bindings
        class << self
          # Add extra methods that we need
          # @param mod [::Rdkafka::Bindings] rdkafka bindings module
          def included(mod)
            mod.attach_function :rd_kafka_name, [:pointer], :string

            # Default rdkafka setup for errors doest not propagate client details, thus it always
            # publishes all the stuff for all rdkafka instances. We change that by providing
            # function that fetches the instance name, allowing us to have better notifications
            mod.send(:remove_const, :ErrorCallback)
            mod.const_set(:ErrorCallback, build_error_callback)
          end

          # @return [FFI::Function] overwritten callback function
          def build_error_callback
            FFI::Function.new(
              :void, %i[pointer int string pointer]
            ) do |client_prr, err_code, reason, _opaque|
              return nil unless ::Rdkafka::Config.error_callback

              name = ::Rdkafka::Bindings.rd_kafka_name(client_prr)

              error = ::Rdkafka::RdkafkaError.new(err_code, broker_message: reason)

              ::Rdkafka::Config.error_callback.call(name, error)
            end
          end
        end
      end
    end
  end
end

::Rdkafka::Bindings.include(::WaterDrop::Patches::Rdkafka::Bindings)

Version data entries

12 entries across 12 versions & 1 rubygems

Version Path
waterdrop-2.4.2 lib/waterdrop/patches/rdkafka/bindings.rb
waterdrop-2.4.1 lib/waterdrop/patches/rdkafka/bindings.rb
waterdrop-2.4.0 lib/waterdrop/patches/rdkafka/bindings.rb
waterdrop-2.3.3 lib/waterdrop/patches/rdkafka/bindings.rb
waterdrop-2.3.2 lib/waterdrop/patches/rdkafka/bindings.rb
waterdrop-2.3.1 lib/waterdrop/patches/rdkafka/bindings.rb
waterdrop-2.3.0 lib/waterdrop/patches/rdkafka/bindings.rb
waterdrop-2.2.0 lib/waterdrop/patches/rdkafka/bindings.rb
waterdrop-2.1.0 lib/water_drop/patches/rdkafka/bindings.rb
waterdrop-2.0.7 lib/water_drop/patches/rdkafka/bindings.rb
waterdrop-2.0.6 lib/water_drop/patches/rdkafka/bindings.rb
waterdrop-2.0.5 lib/water_drop/patches/rdkafka/bindings.rb