lib/rdkafka/config.rb in rdkafka-0.2.0 vs lib/rdkafka/config.rb in rdkafka-0.3.0

- old
+ new

@@ -72,11 +72,11 @@ # # @return [Consumer] The created consumer def consumer kafka = native_kafka(native_config, :rd_kafka_consumer) # Redirect the main queue to the consumer - Rdkafka::FFI.rd_kafka_poll_set_consumer(kafka) + Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) # Return consumer with Kafka client Rdkafka::Consumer.new(kafka) end # Create a producer with this configuration. @@ -87,11 +87,11 @@ # @return [Producer] The created producer def producer # Create Kafka config config = native_config # Set callback to receive delivery reports on config - Rdkafka::FFI.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::FFI::DeliveryCallback) + Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Bindings::DeliveryCallback) # Return producer with Kafka client Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer)) end # Error that is returned by the underlying rdkafka error if an invalid configuration option is present. @@ -106,14 +106,14 @@ private # This method is only intented to be used to create a client, # using it in another way will leak memory. def native_config - Rdkafka::FFI.rd_kafka_conf_new.tap do |config| + Rdkafka::Bindings.rd_kafka_conf_new.tap do |config| @config_hash.merge(REQUIRED_CONFIG).each do |key, value| - error_buffer = ::FFI::MemoryPointer.from_string(" " * 256) - result = Rdkafka::FFI.rd_kafka_conf_set( + error_buffer = FFI::MemoryPointer.from_string(" " * 256) + result = Rdkafka::Bindings.rd_kafka_conf_set( config, key.to_s, value.to_s, error_buffer, 256 @@ -121,17 +121,17 @@ unless result == :config_ok raise ConfigError.new(error_buffer.read_string) end end # Set log callback - Rdkafka::FFI.rd_kafka_conf_set_log_cb(config, Rdkafka::FFI::LogCallback) + Rdkafka::Bindings.rd_kafka_conf_set_log_cb(config, Rdkafka::Bindings::LogCallback) end end def native_kafka(config, type) - error_buffer = ::FFI::MemoryPointer.from_string(" " * 256) - handle = Rdkafka::FFI.rd_kafka_new( + error_buffer = FFI::MemoryPointer.from_string(" " * 256) + handle = Rdkafka::Bindings.rd_kafka_new( type, config, error_buffer, 256 ) @@ -139,17 +139,17 @@ if handle.nil? raise ClientCreationError.new(error_buffer.read_string) end # Redirect log to handle's queue - Rdkafka::FFI.rd_kafka_set_log_queue( + Rdkafka::Bindings.rd_kafka_set_log_queue( handle, - Rdkafka::FFI.rd_kafka_queue_get_main(handle) + Rdkafka::Bindings.rd_kafka_queue_get_main(handle) ) - ::FFI::AutoPointer.new( + FFI::AutoPointer.new( handle, - Rdkafka::FFI.method(:rd_kafka_destroy) + Rdkafka::Bindings.method(:rd_kafka_destroy) ) end end end