lib/rdkafka/config.rb in rdkafka-0.14.1 vs lib/rdkafka/config.rb in rdkafka-0.15.0

- old
+ new

@@ -110,10 +110,11 @@ # # @return [Config] def initialize(config_hash = {}) @config_hash = DEFAULT_CONFIG.merge(config_hash) @consumer_rebalance_listener = nil + @consumer_poll_set = true end # Set a config option. # # @param key [String] The config option's key @@ -138,10 +139,26 @@ # @param listener [Object, #on_partitions_assigned, #on_partitions_revoked] listener instance def consumer_rebalance_listener=(listener) @consumer_rebalance_listener = listener end + # Should we use a single queue for the underlying consumer and events. + # + # This is an advanced API that allows for more granular control of the polling process. + # When this value is set to `false` (`true` by defualt), there will be two queues that need to + # be polled: + # - main librdkafka queue for events + # - consumer queue with messages and rebalances + # + # It is recommended to use the defaults and only set it to `false` in advance multi-threaded + # and complex cases where granular events handling control is needed. + # + # @param poll_set [Boolean] + def consumer_poll_set=(poll_set) + @consumer_poll_set = poll_set + end + # Creates a consumer with this configuration. # # @return [Consumer] The created consumer # # @raise [ConfigError] When the configuration contains invalid options @@ -156,12 +173,12 @@ end # Create native client kafka = native_kafka(config, :rd_kafka_consumer) - # Redirect the main queue to the consumer - Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) + # Redirect the main queue to the consumer queue + Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) if @consumer_poll_set # Return consumer with Kafka client Rdkafka::Consumer.new( Rdkafka::NativeKafka.new( kafka, @@ -185,10 +202,14 @@ # Set callback to receive delivery reports on config Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction) # Return producer with Kafka client partitioner_name = self[:partitioner] || self["partitioner"] Rdkafka::Producer.new( - Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true, opaque: opaque), + Rdkafka::NativeKafka.new( + native_kafka(config, :rd_kafka_producer), + run_polling_thread: true, + opaque: opaque + ), partitioner_name ).tap do |producer| opaque.producer = producer end end