lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta2 vs lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta3

- old
+ new

@@ -6,23 +6,23 @@ # Allows sending messages to a Kafka cluster. # # == Buffering # - # The producer buffers pending messages until {#flush} is called. Note that there is + # The producer buffers pending messages until {#send_messages} is called. Note that there is # a maximum buffer size (default is 1,000 messages) and writing messages after the # buffer has reached this size will result in a BufferOverflow exception. Make sure - # to periodically call {#flush} or set +max_buffer_size+ to an appropriate value. + # to periodically call {#send_messages} or set +max_buffer_size+ to an appropriate value. # # Buffering messages and sending them in batches greatly improves performance, so - # try to avoid flushing after every write. The tradeoff between throughput and + # try to avoid sending messages after every write. The tradeoff between throughput and # message delays depends on your use case. # # == Error Handling and Retries # # The design of the error handling is based on having a {MessageBuffer} hold messages - # for all topics/partitions. Whenever we want to flush messages to the cluster, we + # for all topics/partitions. Whenever we want to send messages to the cluster, we # group the buffered messages by the broker they need to be sent to and fire off a # request to each broker. A request can be a partial success, so we go through the # response and inspect the error code for each partition that we wrote to. If the # write to a given partition was successful, we clear the corresponding messages # from the buffer -- otherwise, we log the error and keep the messages in the buffer. @@ -37,11 +37,11 @@ # # @param broker_pool [BrokerPool] the broker pool representing the cluster. # # @param logger [Logger] # - # @param timeout [Integer] The number of seconds a broker can wait for + # @param ack_timeout [Integer] The number of seconds a broker can wait for # replicas to acknowledge a write before responding with a timeout. # # @param required_acks [Integer] The number of replicas that must acknowledge # a write. # @@ -52,23 +52,23 @@ # @param retry_backoff [Integer] the number of seconds to wait between retries. # # @param max_buffer_size [Integer] the number of messages allowed in the buffer # before new writes will raise BufferOverflow exceptions. # - def initialize(broker_pool:, logger:, timeout: 10, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) + def initialize(broker_pool:, logger:, ack_timeout: 10, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) @broker_pool = broker_pool @logger = logger @required_acks = required_acks - @timeout = timeout + @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @buffer = MessageBuffer.new end - # Writes a message to the specified topic. Note that messages are buffered in - # the producer until {#flush} is called. + # Produces a message to the specified topic. Note that messages are buffered in + # the producer until {#send_messages} is called. # # == Partitioning # # There are several options for specifying the partition that the message should # be written to. @@ -92,11 +92,11 @@ # @param partition [Integer] the partition that the message should be written to. # @param partition_key [String] the key that should be used to assign a partition. # # @raise [BufferOverflow] if the maximum buffer size has been reached. # @return [nil] - def write(value, key: nil, topic:, partition: nil, partition_key: nil) + def produce(value, key: nil, topic:, partition: nil, partition_key: nil) unless buffer_size < @max_buffer_size raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded" end if partition.nil? @@ -111,24 +111,24 @@ @buffer.write(message, topic: topic, partition: partition) partition end - # Flushes all messages to the Kafka brokers. + # Sends all buffered messages to the Kafka brokers. # # Depending on the value of +required_acks+ used when initializing the producer, # this call may block until the specified number of replicas have acknowledged - # the writes. The +timeout+ setting places an upper bound on the amount of time - # the call will block before failing. + # the writes. The +ack_timeout+ setting places an upper bound on the amount of + # time the call will block before failing. # # @raise [FailedToSendMessages] if not all messages could be successfully sent. # @return [nil] - def flush + def send_messages attempt = 0 loop do - @logger.info "Flushing #{@buffer.size} messages" + @logger.info "Sending #{@buffer.size} messages" attempt += 1 transmit_messages if @buffer.empty? @@ -164,10 +164,13 @@ # @return [Integer] buffer size. def buffer_size @buffer.size end + # Closes all connections to the brokers. + # + # @return [nil] def shutdown @broker_pool.shutdown end private @@ -189,11 +192,11 @@ broker = @broker_pool.get_broker(broker_id) response = broker.produce( messages_for_topics: message_set.to_h, required_acks: @required_acks, - timeout: @timeout * 1000, # Kafka expects the timeout in milliseconds. + timeout: @ack_timeout * 1000, # Kafka expects the timeout in milliseconds. ) handle_response(response) if response rescue ConnectionError => e @logger.error "Could not connect to broker #{broker_id}: #{e}" @@ -227,10 +230,10 @@ @logger.error "Not enough in-sync replicas for #{topic}/#{partition}" rescue Kafka::NotEnoughReplicasAfterAppend @logger.error "Messages written, but to fewer in-sync replicas than required for #{topic}/#{partition}" else offset = partition_info.offset - @logger.info "Successfully flushed messages for #{topic}/#{partition}; new offset is #{offset}" + @logger.info "Successfully sent messages for #{topic}/#{partition}; new offset is #{offset}" # The messages were successfully written; clear them from the buffer. @buffer.clear_messages(topic: topic, partition: partition) end end