lib/kafka/producer.rb in ruby-kafka-1.1.0 vs lib/kafka/producer.rb in ruby-kafka-1.2.0

- old
+ new

@@ -5,10 +5,11 @@ require "kafka/message_buffer" require "kafka/produce_operation" require "kafka/pending_message_queue" require "kafka/pending_message" require "kafka/compressor" +require "kafka/interceptors" module Kafka # Allows sending messages to a Kafka cluster. # # Typically you won't instantiate this class yourself, but rather have {Kafka::Client} @@ -127,11 +128,13 @@ # end # class Producer class AbortTransaction < StandardError; end - def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) + def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, + required_acks:, max_retries:, retry_backoff:, max_buffer_size:, + max_buffer_bytesize:, partitioner:, interceptors: []) @cluster = cluster @transaction_manager = transaction_manager @logger = TaggedLogger.new(logger) @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @@ -139,10 +142,12 @@ @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor + @partitioner = partitioner + @interceptors = Interceptors.new(interceptors: interceptors, logger: logger) # The set of topics that are produced to. @target_topics = Set.new # A buffer organized by topic/partition. @@ -189,19 +194,19 @@ # @return [nil] def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) # We want to fail fast if `topic` isn't a String topic = topic.to_str - message = PendingMessage.new( + message = @interceptors.call(PendingMessage.new( value: value && value.to_s, key: key && key.to_s, headers: headers, topic: topic, partition: partition && Integer(partition), partition_key: partition_key && partition_key.to_s, create_time: create_time - ) + )) if buffer_size >= @max_buffer_size buffer_overflow topic, "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached" end @@ -453,10 +458,10 @@ next end if partition.nil? partition_count = @cluster.partitions_for(message.topic).count - partition = Partitioner.partition_for_key(partition_count, message) + partition = @partitioner.call(partition_count, message) end @buffer.write( value: message.value, key: message.key,