lib/kafka/producer.rb in ruby-kafka-1.1.0 vs lib/kafka/producer.rb in ruby-kafka-1.2.0
- old
+ new
@@ -5,10 +5,11 @@
require "kafka/message_buffer"
require "kafka/produce_operation"
require "kafka/pending_message_queue"
require "kafka/pending_message"
require "kafka/compressor"
+require "kafka/interceptors"
module Kafka
# Allows sending messages to a Kafka cluster.
#
# Typically you won't instantiate this class yourself, but rather have {Kafka::Client}
@@ -127,11 +128,13 @@
# end
#
class Producer
class AbortTransaction < StandardError; end
- def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
+ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:,
+ required_acks:, max_retries:, retry_backoff:, max_buffer_size:,
+ max_buffer_bytesize:, partitioner:, interceptors: [])
@cluster = cluster
@transaction_manager = transaction_manager
@logger = TaggedLogger.new(logger)
@instrumenter = instrumenter
@required_acks = required_acks == :all ? -1 : required_acks
@@ -139,10 +142,12 @@
@max_retries = max_retries
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
@max_buffer_bytesize = max_buffer_bytesize
@compressor = compressor
+ @partitioner = partitioner
+ @interceptors = Interceptors.new(interceptors: interceptors, logger: logger)
# The set of topics that are produced to.
@target_topics = Set.new
# A buffer organized by topic/partition.
@@ -189,19 +194,19 @@
# @return [nil]
def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now)
# We want to fail fast if `topic` isn't a String
topic = topic.to_str
- message = PendingMessage.new(
+ message = @interceptors.call(PendingMessage.new(
value: value && value.to_s,
key: key && key.to_s,
headers: headers,
topic: topic,
partition: partition && Integer(partition),
partition_key: partition_key && partition_key.to_s,
create_time: create_time
- )
+ ))
if buffer_size >= @max_buffer_size
buffer_overflow topic,
"Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached"
end
@@ -453,10 +458,10 @@
next
end
if partition.nil?
partition_count = @cluster.partitions_for(message.topic).count
- partition = Partitioner.partition_for_key(partition_count, message)
+ partition = @partitioner.call(partition_count, message)
end
@buffer.write(
value: message.value,
key: message.key,