lib/rdkafka/config.rb in rdkafka-0.13.0.beta.2 vs lib/rdkafka/config.rb in rdkafka-0.13.0.beta.3

- old
+ new

@@ -155,17 +155,18 @@ if @consumer_rebalance_listener opaque.consumer_rebalance_listener = @consumer_rebalance_listener Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback) end + # Create native client kafka = native_kafka(config, :rd_kafka_consumer) # Redirect the main queue to the consumer Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) # Return consumer with Kafka client - Rdkafka::Consumer.new(kafka) + Rdkafka::Consumer.new(Rdkafka::NativeKafka.new(kafka, run_polling_thread: false)) end # Create a producer with this configuration. # # @raise [ConfigError] When the configuration contains invalid options @@ -179,11 +180,11 @@ config = native_config(opaque) # Set callback to receive delivery reports on config Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction) # Return producer with Kafka client partitioner_name = self[:partitioner] || self["partitioner"] - Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer)), partitioner_name).tap do |producer| + Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true), partitioner_name).tap do |producer| opaque.producer = producer end end # Create an admin instance with this configuration. @@ -194,10 +195,10 @@ # @return [Admin] The created admin instance def admin opaque = Opaque.new config = native_config(opaque) Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction) - Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer))) + Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true)) end # Error that is returned by the underlying rdkafka error if an invalid configuration option is present. class ConfigError < RuntimeError; end