lib/rdkafka/config.rb in rdkafka-0.2.0 vs lib/rdkafka/config.rb in rdkafka-0.3.0
- old
+ new
@@ -72,11 +72,11 @@
#
# @return [Consumer] The created consumer
def consumer
kafka = native_kafka(native_config, :rd_kafka_consumer)
# Redirect the main queue to the consumer
- Rdkafka::FFI.rd_kafka_poll_set_consumer(kafka)
+ Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
# Return consumer with Kafka client
Rdkafka::Consumer.new(kafka)
end
# Create a producer with this configuration.
@@ -87,11 +87,11 @@
# @return [Producer] The created producer
def producer
# Create Kafka config
config = native_config
# Set callback to receive delivery reports on config
- Rdkafka::FFI.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::FFI::DeliveryCallback)
+ Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Bindings::DeliveryCallback)
# Return producer with Kafka client
Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer))
end
# Error that is returned by the underlying rdkafka error if an invalid configuration option is present.
@@ -106,14 +106,14 @@
private
# This method is only intented to be used to create a client,
# using it in another way will leak memory.
def native_config
- Rdkafka::FFI.rd_kafka_conf_new.tap do |config|
+ Rdkafka::Bindings.rd_kafka_conf_new.tap do |config|
@config_hash.merge(REQUIRED_CONFIG).each do |key, value|
- error_buffer = ::FFI::MemoryPointer.from_string(" " * 256)
- result = Rdkafka::FFI.rd_kafka_conf_set(
+ error_buffer = FFI::MemoryPointer.from_string(" " * 256)
+ result = Rdkafka::Bindings.rd_kafka_conf_set(
config,
key.to_s,
value.to_s,
error_buffer,
256
@@ -121,17 +121,17 @@
unless result == :config_ok
raise ConfigError.new(error_buffer.read_string)
end
end
# Set log callback
- Rdkafka::FFI.rd_kafka_conf_set_log_cb(config, Rdkafka::FFI::LogCallback)
+ Rdkafka::Bindings.rd_kafka_conf_set_log_cb(config, Rdkafka::Bindings::LogCallback)
end
end
def native_kafka(config, type)
- error_buffer = ::FFI::MemoryPointer.from_string(" " * 256)
- handle = Rdkafka::FFI.rd_kafka_new(
+ error_buffer = FFI::MemoryPointer.from_string(" " * 256)
+ handle = Rdkafka::Bindings.rd_kafka_new(
type,
config,
error_buffer,
256
)
@@ -139,17 +139,17 @@
if handle.nil?
raise ClientCreationError.new(error_buffer.read_string)
end
# Redirect log to handle's queue
- Rdkafka::FFI.rd_kafka_set_log_queue(
+ Rdkafka::Bindings.rd_kafka_set_log_queue(
handle,
- Rdkafka::FFI.rd_kafka_queue_get_main(handle)
+ Rdkafka::Bindings.rd_kafka_queue_get_main(handle)
)
- ::FFI::AutoPointer.new(
+ FFI::AutoPointer.new(
handle,
- Rdkafka::FFI.method(:rd_kafka_destroy)
+ Rdkafka::Bindings.method(:rd_kafka_destroy)
)
end
end
end