lib/karafka/connection/config_adapter.rb in karafka-1.1.2 vs lib/karafka/connection/config_adapter.rb in karafka-1.2.0.beta1
- old
+ new
@@ -12,11 +12,14 @@
# into the client and breaking stuff
module ConfigAdapter
class << self
# Builds all the configuration settings for Kafka.new method
# @param _consumer_group [Karafka::Routing::ConsumerGroup] consumer group details
- # @return [Hash] hash with all the settings required by Kafka.new method
+ # @return [Array<Hash>] Array with all the client arguments including hash with all
+ # the settings required by Kafka.new method
+ # @note We return array, so we can inject any arguments we want, in case of changes in the
+ # raw driver
def client(_consumer_group)
# This one is a default that takes all the settings except special
# cases defined in the map
settings = {
logger: ::Karafka.logger,
@@ -31,31 +34,36 @@
next if AttributesMap.config_adapter.values.flatten.include?(setting_name)
settings[setting_name] = setting_value
end
- sanitize(settings)
+ settings_hash = sanitize(settings)
+
+ # Normalization for the way Kafka::Client accepts arguments from 0.5.3
+ [settings_hash.delete(:seed_brokers), settings_hash]
end
# Builds all the configuration settings for kafka#consumer method
# @param consumer_group [Karafka::Routing::ConsumerGroup] consumer group details
- # @return [Hash] hash with all the settings required by Kafka#consumer method
+ # @return [Array<Hash>] array with all the consumer arguments including hash with all
+ # the settings required by Kafka#consumer
def consumer(consumer_group)
settings = { group_id: consumer_group.id }
settings = fetch_for(:consumer, consumer_group, settings)
- sanitize(settings)
+ [sanitize(settings)]
end
# Builds all the configuration settings for kafka consumer consume_each_batch and
# consume_each_message methods
# @param consumer_group [Karafka::Routing::ConsumerGroup] consumer group details
- # @return [Hash] hash with all the settings required by
+ # @return [Array<Hash>] Array with all the arguments required by consuming method
+ # including hash with all the settings required by
# Kafka::Consumer#consume_each_message and Kafka::Consumer#consume_each_batch method
def consuming(consumer_group)
settings = {
automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed
}
- sanitize(fetch_for(:consuming, consumer_group, settings))
+ [sanitize(fetch_for(:consuming, consumer_group, settings))]
end
# Builds all the configuration settings for kafka consumer#subscribe method
# @param topic [Karafka::Routing::Topic] topic that holds details for a given subscription
# @return [Hash] hash with all the settings required by kafka consumer#subscribe method