lib/deimos/config/configuration.rb in deimos-ruby-1.22.5 vs lib/deimos/config/configuration.rb in deimos-ruby-1.23.0
- old
+ new
@@ -77,10 +77,11 @@
end
end
# @!visibility private
# @param kafka_config [FigTree::ConfigStruct]
+ # rubocop:disable Metrics/PerceivedComplexity, Metrics/AbcSize
def self.configure_producer_or_consumer(kafka_config)
klass = kafka_config.class_name.constantize
klass.class_eval do
topic(kafka_config.topic) if kafka_config.topic.present? && klass.respond_to?(:topic)
schema(kafka_config.schema) if kafka_config.schema.present?
@@ -88,15 +89,22 @@
key_config(**kafka_config.key_config) if kafka_config.key_config.present?
schema_class_config(kafka_config.use_schema_classes) if kafka_config.use_schema_classes.present?
if kafka_config.respond_to?(:bulk_import_id_column) # consumer
klass.config.merge!(
bulk_import_id_column: kafka_config.bulk_import_id_column,
- replace_associations: kafka_config.replace_associations
+ replace_associations: if kafka_config.replace_associations.nil?
+ Deimos.config.consumers.replace_associations
+ else
+ kafka_config.replace_associations
+ end,
+ bulk_import_id_generator: kafka_config.bulk_import_id_generator ||
+ Deimos.config.consumers.bulk_import_id_generator
)
end
end
end
+ # rubocop:enable Metrics/PerceivedComplexity, Metrics/AbcSize
define_settings do
# @return [Logger]
setting :logger, Logger.new(STDOUT)
@@ -240,10 +248,19 @@
# Block taking an exception, payload and metadata and returning
# true if this should be considered a fatal error and false otherwise.
# Not needed if reraise_errors is set to true.
# @return [Block]
setting(:fatal_error, proc { false })
+
+ # The default function to generate a bulk ID for bulk consumers
+ # @return [Block]
+ setting(:bulk_import_id_generator, proc { SecureRandom.uuid })
+
+ # If true, multi-table consumers will blow away associations rather than appending to them.
+ # Applies to all consumers unless specified otherwise
+ # @return [Boolean]
+ setting :replace_associations, true
end
setting :producers do
# Number of seconds a broker can wait for replicas to acknowledge
# a write before responding with a timeout.
@@ -443,10 +460,16 @@
# Column to use for bulk imports, for multi-table feature.
# @return [String]
setting :bulk_import_id_column, :bulk_import_id
# If true, multi-table consumers will blow away associations rather than appending to them.
# @return [Boolean]
- setting :replace_associations, true
+ setting :replace_associations, nil
+
+ # The default function to generate a bulk ID for this consumer
+ # Uses the consumers proc defined in the consumers config by default unless
+ # specified for individual consumers
+ # @return [Block]
+ setting :bulk_import_id_generator, nil
# These are the phobos "listener" configs. See CONFIGURATION.md for more
# info.
setting :group_id
setting :max_concurrency, 1