lib/rdkafka/config.rb in rdkafka-0.13.0.beta.2 vs lib/rdkafka/config.rb in rdkafka-0.13.0.beta.3
- old
+ new
@@ -155,17 +155,18 @@
if @consumer_rebalance_listener
opaque.consumer_rebalance_listener = @consumer_rebalance_listener
Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback)
end
+ # Create native client
kafka = native_kafka(config, :rd_kafka_consumer)
# Redirect the main queue to the consumer
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
# Return consumer with Kafka client
- Rdkafka::Consumer.new(kafka)
+ Rdkafka::Consumer.new(Rdkafka::NativeKafka.new(kafka, run_polling_thread: false))
end
# Create a producer with this configuration.
#
# @raise [ConfigError] When the configuration contains invalid options
@@ -179,11 +180,11 @@
config = native_config(opaque)
# Set callback to receive delivery reports on config
Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
# Return producer with Kafka client
partitioner_name = self[:partitioner] || self["partitioner"]
- Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer)), partitioner_name).tap do |producer|
+ Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true), partitioner_name).tap do |producer|
opaque.producer = producer
end
end
# Create an admin instance with this configuration.
@@ -194,10 +195,10 @@
# @return [Admin] The created admin instance
def admin
opaque = Opaque.new
config = native_config(opaque)
Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction)
- Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer)))
+ Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true))
end
# Error that is returned by the underlying rdkafka error if an invalid configuration option is present.
class ConfigError < RuntimeError; end