lib/kafka/producer.rb in ruby-kafka-0.1.3 vs lib/kafka/producer.rb in ruby-kafka-0.1.4

- old
+ new

@@ -1,9 +1,9 @@ require "kafka/partitioner" require "kafka/message_buffer" -require "kafka/protocol/message" -require "kafka/transmission" +require "kafka/produce_operation" +require "kafka/pending_message" module Kafka # Allows sending messages to a Kafka cluster. # @@ -15,11 +15,11 @@ # # # Will instantiate Kafka::Producer # producer = kafka.get_producer # # This is done in order to share a logger as well as a pool of broker connections across - # different producers. This also means that you don't need to pass the `broker_pool` and + # different producers. This also means that you don't need to pass the `cluster` and # `logger` options to `#get_producer`. See {#initialize} for the list of other options # you can pass in. # # ## Buffering # @@ -84,12 +84,11 @@ # class Producer # Initializes a new Producer. # - # @param broker_pool [BrokerPool] the broker pool representing the cluster. - # Typically passed in for you. + # @param cluster [Cluster] the cluster client. Typically passed in for you. # # @param logger [Logger] the logger that should be used. Typically passed # in for you. # # @param ack_timeout [Integer] The number of seconds a broker can wait for @@ -105,19 +104,24 @@ # @param retry_backoff [Integer] the number of seconds to wait between retries. # # @param max_buffer_size [Integer] the number of messages allowed in the buffer # before new writes will raise BufferOverflow exceptions. # - def initialize(broker_pool:, logger:, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) - @broker_pool = broker_pool + def initialize(cluster:, logger:, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) + @cluster = cluster @logger = logger @required_acks = required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size + + # A buffer organized by topic/partition. @buffer = MessageBuffer.new + + # Messages added by `#produce` but not yet assigned a partition. + @pending_messages = [] end # Produces a message to the specified topic. Note that messages are buffered in # the producer until {#send_messages} is called. # @@ -150,22 +154,19 @@ def produce(value, key: nil, topic:, partition: nil, partition_key: nil) unless buffer_size < @max_buffer_size raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded" end - if partition.nil? - # If no explicit partition key is specified we use the message key instead. - partition_key ||= key - partitioner = Partitioner.new(@broker_pool.partitions_for(topic)) - partition = partitioner.partition_for_key(partition_key) - end + @pending_messages << PendingMessage.new( + value: value, + key: key, + topic: topic, + partition: partition, + partition_key: partition_key, + ) - message = Protocol::Message.new(key: key, value: value) - - @buffer.write(message, topic: topic, partition: partition) - - partition + nil end # Sends all buffered messages to the Kafka brokers. # # Depending on the value of `required_acks` used when initializing the producer, @@ -176,34 +177,38 @@ # @raise [FailedToSendMessages] if not all messages could be successfully sent. # @return [nil] def send_messages attempt = 0 - transmission = Transmission.new( - broker_pool: @broker_pool, + # Make sure we get metadata for this topic. + target_topics = @pending_messages.map(&:topic).uniq + @cluster.add_target_topics(target_topics) + + operation = ProduceOperation.new( + cluster: @cluster, buffer: @buffer, required_acks: @required_acks, ack_timeout: @ack_timeout, logger: @logger, ) loop do - @logger.info "Sending #{@buffer.size} messages" - attempt += 1 - transmission.send_messages - if @buffer.empty? - @logger.info "Successfully transmitted all messages" + @cluster.refresh_metadata_if_necessary! + + assign_partitions! + operation.execute + + if @pending_messages.empty? && @buffer.empty? break elsif attempt <= @max_retries - @logger.warn "Failed to transmit all messages, retry #{attempt} of #{@max_retries}" - @logger.info "Waiting #{@retry_backoff}s before retrying" + @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" sleep @retry_backoff else - @logger.error "Failed to transmit all messages; keeping remaining messages in buffer" + @logger.error "Failed to send all messages; keeping remaining messages in buffer" break end end if @required_acks == 0 @@ -222,16 +227,45 @@ # Returns the number of messages currently held in the buffer. # # @return [Integer] buffer size. def buffer_size - @buffer.size + @pending_messages.size + @buffer.size end # Closes all connections to the brokers. # # @return [nil] def shutdown - @broker_pool.shutdown + @cluster.disconnect + end + + private + + def assign_partitions! + until @pending_messages.empty? + # We want to keep the message in the first-stage buffer in case there's an error. + message = @pending_messages.first + + partition = message.partition + + if partition.nil? + partition_count = @cluster.partitions_for(message.topic).count + partition = Partitioner.partition_for_key(partition_count, message) + end + + @buffer.write( + value: message.value, + key: message.key, + topic: message.topic, + partition: partition, + ) + + # Now it's safe to remove the message from the first-stage buffer. + @pending_messages.shift + end + rescue Kafka::Error => e + @logger.error "Failed to assign pending message to a partition: #{e}" + @cluster.mark_as_stale! end end end