lib/rdkafka/config.rb in rdkafka-0.4.2 vs lib/rdkafka/config.rb in rdkafka-0.5.0
- old
+ new
@@ -70,10 +70,11 @@
# @param config_hash [Hash<String,Symbol => String>] The config options for rdkafka
#
# @return [Config]
def initialize(config_hash = {})
@config_hash = DEFAULT_CONFIG.merge(config_hash)
+ @consumer_rebalance_listener = nil
end
# Set a config option.
#
# @param key [String] The config option's key
@@ -91,20 +92,37 @@
# @return [String, nil] The config option or `nil` if it is not present
def [](key)
@config_hash[key]
end
+ # Get notifications on partition assignment/revocation for the subscribed topics
+ #
+ # @param listener [Object, #on_partitions_assigned, #on_partitions_revoked] listener instance
+ def consumer_rebalance_listener=(listener)
+ @consumer_rebalance_listener = listener
+ end
+
# Create a consumer with this configuration.
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
#
# @return [Consumer] The created consumer
def consumer
- kafka = native_kafka(native_config, :rd_kafka_consumer)
+ opaque = Opaque.new
+ config = native_config(opaque)
+
+ if @consumer_rebalance_listener
+ opaque.consumer_rebalance_listener = @consumer_rebalance_listener
+ Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback)
+ end
+
+ kafka = native_kafka(config, :rd_kafka_consumer)
+
# Redirect the main queue to the consumer
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
+
# Return consumer with Kafka client
Rdkafka::Consumer.new(kafka)
end
# Create a producer with this configuration.
@@ -202,11 +220,26 @@
end
# @private
class Opaque
attr_accessor :producer
+ attr_accessor :consumer_rebalance_listener
def call_delivery_callback(delivery_handle)
producer.call_delivery_callback(delivery_handle) if producer
+ end
+
+ def call_on_partitions_assigned(consumer, list)
+ return unless consumer_rebalance_listener
+ return unless consumer_rebalance_listener.respond_to?(:on_partitions_assigned)
+
+ consumer_rebalance_listener.on_partitions_assigned(consumer, list)
+ end
+
+ def call_on_partitions_revoked(consumer, list)
+ return unless consumer_rebalance_listener
+ return unless consumer_rebalance_listener.respond_to?(:on_partitions_revoked)
+
+ consumer_rebalance_listener.on_partitions_revoked(consumer, list)
end
end
end