lib/kafka/producer.rb in ruby-kafka-0.7.5 vs lib/kafka/producer.rb in ruby-kafka-0.7.6.beta1
- old
+ new
@@ -128,11 +128,11 @@
class AbortTransaction < StandardError; end
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
@cluster = cluster
@transaction_manager = transaction_manager
- @logger = logger
+ @logger = TaggedLogger.new(logger)
@instrumenter = instrumenter
@required_acks = required_acks == :all ? -1 : required_acks
@ack_timeout = ack_timeout
@max_retries = max_retries
@retry_backoff = retry_backoff
@@ -148,10 +148,14 @@
# Messages added by `#produce` but not yet assigned a partition.
@pending_message_queue = PendingMessageQueue.new
end
+ def to_s
+ "Producer #{@target_topics.to_a.join(', ')}"
+ end
+
# Produces a message to the specified topic. Note that messages are buffered in
# the producer until {#deliver_messages} is called.
#
# ## Partitioning
#
@@ -203,11 +207,11 @@
end
# If the producer is in transactional mode, all the message production
# must be used when the producer is currently in transaction
if @transaction_manager.transactional? && !@transaction_manager.in_transaction?
- raise 'You must trigger begin_transaction before producing messages'
+ raise "Cannot produce to #{topic}: You must trigger begin_transaction before producing messages"
end
@target_topics.add(topic)
@pending_message_queue.write(message)
@@ -389,15 +393,15 @@
end
if buffer_size.zero?
break
elsif attempt <= @max_retries
- @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s"
+ @logger.warn "Failed to send all messages to #{pretty_partitions}; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s"
sleep @retry_backoff
else
- @logger.error "Failed to send all messages; keeping remaining messages in buffer"
+ @logger.error "Failed to send all messages to #{pretty_partitions}; keeping remaining messages in buffer"
break
end
end
unless @pending_message_queue.empty?
@@ -405,13 +409,15 @@
@cluster.mark_as_stale!
raise DeliveryFailed.new("Failed to assign partitions to #{@pending_message_queue.size} messages", buffer_messages)
end
unless @buffer.empty?
- partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")
-
- raise DeliveryFailed.new("Failed to send messages to #{partitions}", buffer_messages)
+ raise DeliveryFailed.new("Failed to send messages to #{pretty_partitions}", buffer_messages)
end
+ end
+
+ def pretty_partitions
+ @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")
end
def assign_partitions!
failed_messages = []
topics_with_failures = Set.new