lib/rdkafka/config.rb in rdkafka-0.14.1 vs lib/rdkafka/config.rb in rdkafka-0.15.0
- old
+ new
@@ -110,10 +110,11 @@
#
# @return [Config]
def initialize(config_hash = {})
@config_hash = DEFAULT_CONFIG.merge(config_hash)
@consumer_rebalance_listener = nil
+ @consumer_poll_set = true
end
# Set a config option.
#
# @param key [String] The config option's key
@@ -138,10 +139,26 @@
# @param listener [Object, #on_partitions_assigned, #on_partitions_revoked] listener instance
def consumer_rebalance_listener=(listener)
@consumer_rebalance_listener = listener
end
+ # Should we use a single queue for the underlying consumer and events.
+ #
+ # This is an advanced API that allows for more granular control of the polling process.
+ # When this value is set to `false` (`true` by defualt), there will be two queues that need to
+ # be polled:
+ # - main librdkafka queue for events
+ # - consumer queue with messages and rebalances
+ #
+ # It is recommended to use the defaults and only set it to `false` in advance multi-threaded
+ # and complex cases where granular events handling control is needed.
+ #
+ # @param poll_set [Boolean]
+ def consumer_poll_set=(poll_set)
+ @consumer_poll_set = poll_set
+ end
+
# Creates a consumer with this configuration.
#
# @return [Consumer] The created consumer
#
# @raise [ConfigError] When the configuration contains invalid options
@@ -156,12 +173,12 @@
end
# Create native client
kafka = native_kafka(config, :rd_kafka_consumer)
- # Redirect the main queue to the consumer
- Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
+ # Redirect the main queue to the consumer queue
+ Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) if @consumer_poll_set
# Return consumer with Kafka client
Rdkafka::Consumer.new(
Rdkafka::NativeKafka.new(
kafka,
@@ -185,10 +202,14 @@
# Set callback to receive delivery reports on config
Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
# Return producer with Kafka client
partitioner_name = self[:partitioner] || self["partitioner"]
Rdkafka::Producer.new(
- Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true, opaque: opaque),
+ Rdkafka::NativeKafka.new(
+ native_kafka(config, :rd_kafka_producer),
+ run_polling_thread: true,
+ opaque: opaque
+ ),
partitioner_name
).tap do |producer|
opaque.producer = producer
end
end