lib/kafka/producer.rb in ruby-kafka-0.3.2 vs lib/kafka/producer.rb in ruby-kafka-0.3.3
- old
+ new
@@ -128,13 +128,14 @@
# producer.shutdown
# end
#
class Producer
- def initialize(cluster:, logger:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
+ def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
@cluster = cluster
@logger = logger
+ @instrumenter = instrumenter
@required_acks = required_acks
@ack_timeout = ack_timeout
@max_retries = max_retries
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
@@ -201,11 +202,11 @@
end
@target_topics.add(topic)
@pending_message_queue.write(message)
- Instrumentation.instrument("produce_message.producer.kafka", {
+ @instrumenter.instrument("produce_message.producer", {
value: value,
key: key,
topic: topic,
create_time: create_time,
buffer_size: buffer_size,
@@ -226,11 +227,11 @@
# @return [nil]
def deliver_messages
# There's no need to do anything if the buffer is empty.
return if buffer_size == 0
- Instrumentation.instrument("deliver_messages.producer.kafka") do |notification|
+ @instrumenter.instrument("deliver_messages.producer") do |notification|
message_count = buffer_size
notification[:message_count] = message_count
notification[:attempts] = 0
@@ -251,10 +252,18 @@
def buffer_bytesize
@pending_message_queue.bytesize + @buffer.bytesize
end
+ # Deletes all buffered messages.
+ #
+ # @return [nil]
+ def clear_buffer
+ @buffer.clear
+ @pending_message_queue.clear
+ end
+
# Closes all connections to the brokers.
#
# @return [nil]
def shutdown
@cluster.disconnect
@@ -272,10 +281,11 @@
buffer: @buffer,
required_acks: @required_acks,
ack_timeout: @ack_timeout,
compressor: @compressor,
logger: @logger,
+ instrumenter: @instrumenter,
)
loop do
attempt += 1
@@ -284,10 +294,17 @@
@cluster.refresh_metadata_if_necessary!
assign_partitions!
operation.execute
+ if @required_acks.zero?
+ # No response is returned by the brokers, so we can't know which messages
+ # have been successfully written. Our only option is to assume that they all
+ # have.
+ @buffer.clear
+ end
+
if buffer_size.zero?
break
elsif attempt <= @max_retries
@logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s"
@@ -296,14 +313,11 @@
@logger.error "Failed to send all messages; keeping remaining messages in buffer"
break
end
end
- if @required_acks == 0
- # No response is returned by the brokers, so we can't know which messages
- # have been successfully written. Our only option is to assume that they all
- # have.
- @buffer.clear
+ unless @pending_message_queue.empty?
+ raise DeliveryFailed, "Failed to assign partitions to #{@pending_message_queue.size} messages"
end
unless @buffer.empty?
partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")