Sha256: 517be6dc0590fcccf9b7465412949c5412dcc369820ee263b06b5d4875bb7946
Contents?: true
Size: 1.55 KB
Versions: 9
Compression:
Stored size: 1.55 KB
Contents
# frozen_string_literal: true module Karafka module Core # Patches to dependencies and components module Patches # Patches to rdkafka module Rdkafka # Extends `Rdkafka::Bindings` with some extra methods and updates callbacks that we intend # to work with in a bit different way than rdkafka itself module Bindings class << self # Add extra methods that we need # @param mod [::Rdkafka::Bindings] rdkafka bindings module def included(mod) # Default rdkafka setup for errors doest not propagate client details, thus it always # publishes all the stuff for all rdkafka instances. We change that by providing # function that fetches the instance name, allowing us to have better notifications mod.send(:remove_const, :ErrorCallback) mod.const_set(:ErrorCallback, build_error_callback) end # @return [FFI::Function] overwritten callback function def build_error_callback FFI::Function.new( :void, %i[pointer int string pointer] ) do |client_prr, err_code, reason, _opaque| return nil unless ::Rdkafka::Config.error_callback name = ::Rdkafka::Bindings.rd_kafka_name(client_prr) error = ::Rdkafka::RdkafkaError.new(err_code, broker_message: reason) ::Rdkafka::Config.error_callback.call(name, error) end end end end end end end end
Version data entries
9 entries across 9 versions & 1 rubygems