Sha256: ff75800cbbd49fdb190ac27af6f0cc3b928b4e5f78c879ece609ce82538162bf

Contents?: true

Size: 1.91 KB

Versions: 8

Compression:

Stored size: 1.91 KB

Contents

module Rdkafka
  class Config
    DEFAULT_CONFIG = {
      :"api.version.request" => true
    }

    def initialize(config_hash = {})
      @config_hash = DEFAULT_CONFIG.merge(config_hash)
    end

    def []=(key, value)
      @config_hash[key] = value
    end

    def [](key)
      @config_hash[key]
    end

    def consumer
      kafka = native_kafka(native_config, :rd_kafka_consumer)
      Rdkafka::FFI.rd_kafka_poll_set_consumer(kafka)
      Rdkafka::Consumer.new(kafka)
    end

    def producer
      # Create Kafka config
      config = native_config
      # Set callback to receive delivery reports on config
      Rdkafka::FFI.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::FFI::DeliveryCallback)
      # Return producer with Kafka client
      Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer))
    end

    class ConfigError < RuntimeError; end
    class ClientCreationError < RuntimeError; end

    private

    # This method is only intented to be used to create a client,
    # using it in another way will leak memory.
    def native_config
      config = Rdkafka::FFI.rd_kafka_conf_new

      @config_hash.each do |key, value|
        error_buffer = ::FFI::MemoryPointer.from_string(" " * 256)
        result = Rdkafka::FFI.rd_kafka_conf_set(
          config,
          key.to_s,
          value.to_s,
          error_buffer,
          256
        )
        unless result == :config_ok
          raise ConfigError.new(error_buffer.read_string)
        end
      end

      config
    end

    def native_kafka(config, type)
      error_buffer = ::FFI::MemoryPointer.from_string(" " * 256)
      handle = Rdkafka::FFI.rd_kafka_new(
        type,
        config,
        error_buffer,
        256
      )

      if handle.nil?
        raise ClientCreationError.new(error_buffer.read_string)
      end

      ::FFI::AutoPointer.new(
        handle,
        Rdkafka::FFI.method(:rd_kafka_destroy)
      )
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
rdkafka-0.1.7 lib/rdkafka/config.rb
rdkafka-0.1.6 lib/rdkafka/config.rb
rdkafka-0.1.5 lib/rdkafka/config.rb
rdkafka-0.1.4 lib/rdkafka/config.rb
rdkafka-0.1.3 lib/rdkafka/config.rb
rdkafka-0.1.2 lib/rdkafka/config.rb
rdkafka-0.1.1 lib/rdkafka/config.rb
rdkafka-0.1.0 lib/rdkafka/config.rb