lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta1 vs lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta2

- old
+ new

@@ -1,102 +1,240 @@ -require "kafka/message" -require "kafka/message_set" require "kafka/partitioner" +require "kafka/message_buffer" +require "kafka/protocol/message" module Kafka + + # Allows sending messages to a Kafka cluster. + # + # == Buffering + # + # The producer buffers pending messages until {#flush} is called. Note that there is + # a maximum buffer size (default is 1,000 messages) and writing messages after the + # buffer has reached this size will result in a BufferOverflow exception. Make sure + # to periodically call {#flush} or set +max_buffer_size+ to an appropriate value. + # + # Buffering messages and sending them in batches greatly improves performance, so + # try to avoid flushing after every write. The tradeoff between throughput and + # message delays depends on your use case. + # + # == Error Handling and Retries + # + # The design of the error handling is based on having a {MessageBuffer} hold messages + # for all topics/partitions. Whenever we want to flush messages to the cluster, we + # group the buffered messages by the broker they need to be sent to and fire off a + # request to each broker. A request can be a partial success, so we go through the + # response and inspect the error code for each partition that we wrote to. If the + # write to a given partition was successful, we clear the corresponding messages + # from the buffer -- otherwise, we log the error and keep the messages in the buffer. + # + # After this, we check if the buffer is empty. If it is, we're all done. If it's + # not, we do another round of requests, this time with just the remaining messages. + # We do this for as long as +max_retries+ permits. + # class Producer - # @param timeout [Integer] The number of seconds to wait for an - # acknowledgement from the broker before timing out. + + # Initializes a new Producer. + # + # @param broker_pool [BrokerPool] the broker pool representing the cluster. + # + # @param logger [Logger] + # + # @param timeout [Integer] The number of seconds a broker can wait for + # replicas to acknowledge a write before responding with a timeout. + # # @param required_acks [Integer] The number of replicas that must acknowledge # a write. - def initialize(broker_pool:, logger:, timeout: 10, required_acks: 1) + # + # @param max_retries [Integer] the number of retries that should be attempted + # before giving up sending messages to the cluster. Does not include the + # original attempt. + # + # @param retry_backoff [Integer] the number of seconds to wait between retries. + # + # @param max_buffer_size [Integer] the number of messages allowed in the buffer + # before new writes will raise BufferOverflow exceptions. + # + def initialize(broker_pool:, logger:, timeout: 10, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) @broker_pool = broker_pool @logger = logger @required_acks = required_acks @timeout = timeout - @buffered_messages = [] + @max_retries = max_retries + @retry_backoff = retry_backoff + @max_buffer_size = max_buffer_size + @buffer = MessageBuffer.new end # Writes a message to the specified topic. Note that messages are buffered in # the producer until {#flush} is called. # # == Partitioning # # There are several options for specifying the partition that the message should - # be written to. The simplest option is to not specify a partition or partition - # key, in which case the message key will be used to select one of the available - # partitions. You can also specify the `partition` parameter yourself. This - # requires you to know which partitions are available, however. Oftentimes the - # best option is to specify the `partition_key` parameter: messages with the - # same partition key will always be assigned to the same partition, as long as - # the number of partitions doesn't change. + # be written to. # + # The simplest option is to not specify a message key, partition key, or + # partition number, in which case the message will be assigned a partition at + # random. + # + # You can also specify the +partition+ parameter yourself. This requires you to + # know which partitions are available, however. Oftentimes the best option is + # to specify the +partition_key+ parameter: messages with the same partition + # key will always be assigned to the same partition, as long as the number of + # partitions doesn't change. You can also omit the partition key and specify + # a message key instead. The message key is part of the message payload, and + # so can carry semantic value--whether you want to have the message key double + # as a partition key is up to you. + # # @param value [String] the message data. # @param key [String] the message key. # @param topic [String] the topic that the message should be written to. # @param partition [Integer] the partition that the message should be written to. # @param partition_key [String] the key that should be used to assign a partition. # - # @return [Message] the message that was written. - def write(value, key:, topic:, partition: nil, partition_key: nil) + # @raise [BufferOverflow] if the maximum buffer size has been reached. + # @return [nil] + def write(value, key: nil, topic:, partition: nil, partition_key: nil) + unless buffer_size < @max_buffer_size + raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded" + end + if partition.nil? # If no explicit partition key is specified we use the message key instead. partition_key ||= key partitioner = Partitioner.new(@broker_pool.partitions_for(topic)) partition = partitioner.partition_for_key(partition_key) end - message = Message.new(value, key: key, topic: topic, partition: partition) + message = Protocol::Message.new(key: key, value: value) - @buffered_messages << message + @buffer.write(message, topic: topic, partition: partition) - message + partition end # Flushes all messages to the Kafka brokers. # - # Depending on the value of `required_acks` used when initializing the producer, + # Depending on the value of +required_acks+ used when initializing the producer, # this call may block until the specified number of replicas have acknowledged - # the writes. The `timeout` setting places an upper bound on the amount of time + # the writes. The +timeout+ setting places an upper bound on the amount of time # the call will block before failing. # + # @raise [FailedToSendMessages] if not all messages could be successfully sent. # @return [nil] def flush + attempt = 0 + + loop do + @logger.info "Flushing #{@buffer.size} messages" + + attempt += 1 + transmit_messages + + if @buffer.empty? + @logger.info "Successfully transmitted all messages" + break + elsif attempt <= @max_retries + @logger.warn "Failed to transmit all messages, retry #{attempt} of #{@max_retries}" + @logger.info "Waiting #{@retry_backoff}s before retrying" + + sleep @retry_backoff + else + @logger.error "Failed to transmit all messages; keeping remaining messages in buffer" + break + end + end + + if @required_acks == 0 + # No response is returned by the brokers, so we can't know which messages + # have been successfully written. Our only option is to assume that they all + # have. + @buffer.clear + end + + unless @buffer.empty? + partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") + + raise FailedToSendMessages, "Failed to send messages to #{partitions}" + end + end + + # Returns the number of messages currently held in the buffer. + # + # @return [Integer] buffer size. + def buffer_size + @buffer.size + end + + def shutdown + @broker_pool.shutdown + end + + private + + def transmit_messages messages_for_broker = {} - @buffered_messages.each do |message| - broker = @broker_pool.get_leader(message.topic, message.partition) + @buffer.each do |topic, partition, messages| + broker_id = @broker_pool.get_leader_id(topic, partition) - messages_for_broker[broker] ||= [] - messages_for_broker[broker] << message + @logger.debug "Current leader for #{topic}/#{partition} is node #{broker_id}" + + messages_for_broker[broker_id] ||= MessageBuffer.new + messages_for_broker[broker_id].concat(messages, topic: topic, partition: partition) end - messages_for_broker.each do |broker, messages| - @logger.info "Sending #{messages.count} messages to broker #{broker}" + messages_for_broker.each do |broker_id, message_set| + begin + broker = @broker_pool.get_broker(broker_id) - message_set = MessageSet.new(messages) + response = broker.produce( + messages_for_topics: message_set.to_h, + required_acks: @required_acks, + timeout: @timeout * 1000, # Kafka expects the timeout in milliseconds. + ) - response = broker.produce( - messages_for_topics: message_set.to_h, - required_acks: @required_acks, - timeout: @timeout * 1000, # Kafka expects the timeout in milliseconds. - ) + handle_response(response) if response + rescue ConnectionError => e + @logger.error "Could not connect to broker #{broker_id}: #{e}" - if response - response.topics.each do |topic_info| - topic_info.partitions.each do |partition_info| - Protocol.handle_error(partition_info.error_code) - end - end + # Mark the broker pool as stale in order to force a cluster metadata refresh. + @broker_pool.mark_as_stale! end end + end - @buffered_messages.clear + def handle_response(response) + response.each_partition do |topic_info, partition_info| + topic = topic_info.topic + partition = partition_info.partition - nil - end + begin + Protocol.handle_error(partition_info.error_code) + rescue Kafka::CorruptMessage + @logger.error "Corrupt message when writing to #{topic}/#{partition}" + rescue Kafka::UnknownTopicOrPartition + @logger.error "Unknown topic or partition #{topic}/#{partition}" + rescue Kafka::LeaderNotAvailable + @logger.error "Leader currently not available for #{topic}/#{partition}" + @broker_pool.mark_as_stale! + rescue Kafka::NotLeaderForPartition + @logger.error "Broker not currently leader for #{topic}/#{partition}" + @broker_pool.mark_as_stale! + rescue Kafka::RequestTimedOut + @logger.error "Timed out while writing to #{topic}/#{partition}" + rescue Kafka::NotEnoughReplicas + @logger.error "Not enough in-sync replicas for #{topic}/#{partition}" + rescue Kafka::NotEnoughReplicasAfterAppend + @logger.error "Messages written, but to fewer in-sync replicas than required for #{topic}/#{partition}" + else + offset = partition_info.offset + @logger.info "Successfully flushed messages for #{topic}/#{partition}; new offset is #{offset}" - def shutdown - @broker_pool.shutdown + # The messages were successfully written; clear them from the buffer. + @buffer.clear_messages(topic: topic, partition: partition) + end + end end end end