lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta3 vs lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta4

- old
+ new

@@ -4,22 +4,22 @@ module Kafka # Allows sending messages to a Kafka cluster. # - # == Buffering + # ## Buffering # # The producer buffers pending messages until {#send_messages} is called. Note that there is # a maximum buffer size (default is 1,000 messages) and writing messages after the # buffer has reached this size will result in a BufferOverflow exception. Make sure - # to periodically call {#send_messages} or set +max_buffer_size+ to an appropriate value. + # to periodically call {#send_messages} or set `max_buffer_size` to an appropriate value. # # Buffering messages and sending them in batches greatly improves performance, so # try to avoid sending messages after every write. The tradeoff between throughput and # message delays depends on your use case. # - # == Error Handling and Retries + # ## Error Handling and Retries # # The design of the error handling is based on having a {MessageBuffer} hold messages # for all topics/partitions. Whenever we want to send messages to the cluster, we # group the buffered messages by the broker they need to be sent to and fire off a # request to each broker. A request can be a partial success, so we go through the @@ -27,12 +27,48 @@ # write to a given partition was successful, we clear the corresponding messages # from the buffer -- otherwise, we log the error and keep the messages in the buffer. # # After this, we check if the buffer is empty. If it is, we're all done. If it's # not, we do another round of requests, this time with just the remaining messages. - # We do this for as long as +max_retries+ permits. + # We do this for as long as `max_retries` permits. # + # ## Example + # + # This is an example of an application which reads lines from stdin and writes them + # to Kafka: + # + # require "kafka" + # + # logger = Logger.new($stderr) + # brokers = ENV.fetch("KAFKA_BROKERS").split(",") + # + # # Make sure to create this topic in your Kafka cluster or configure the + # # cluster to auto-create topics. + # topic = "random-messages" + # + # kafka = Kafka.new( + # seed_brokers: brokers, + # client_id: "simple-producer", + # logger: logger, + # ) + # + # producer = kafka.get_producer + # + # begin + # $stdin.each_with_index do |line, index| + # producer.produce(line, topic: topic) + # + # # Send messages for every 10 lines. + # producer.send_messages if index % 10 == 0 + # end + # ensure + # # Make sure to send any remaining messages. + # producer.send_messages + # + # producer.shutdown + # end + # class Producer # Initializes a new Producer. # # @param broker_pool [BrokerPool] the broker pool representing the cluster. @@ -66,22 +102,22 @@ end # Produces a message to the specified topic. Note that messages are buffered in # the producer until {#send_messages} is called. # - # == Partitioning + # ## Partitioning # # There are several options for specifying the partition that the message should # be written to. # # The simplest option is to not specify a message key, partition key, or # partition number, in which case the message will be assigned a partition at # random. # - # You can also specify the +partition+ parameter yourself. This requires you to + # You can also specify the `partition` parameter yourself. This requires you to # know which partitions are available, however. Oftentimes the best option is - # to specify the +partition_key+ parameter: messages with the same partition + # to specify the `partition_key` parameter: messages with the same partition # key will always be assigned to the same partition, as long as the number of # partitions doesn't change. You can also omit the partition key and specify # a message key instead. The message key is part of the message payload, and # so can carry semantic value--whether you want to have the message key double # as a partition key is up to you. @@ -113,12 +149,12 @@ partition end # Sends all buffered messages to the Kafka brokers. # - # Depending on the value of +required_acks+ used when initializing the producer, + # Depending on the value of `required_acks` used when initializing the producer, # this call may block until the specified number of replicas have acknowledged - # the writes. The +ack_timeout+ setting places an upper bound on the amount of + # the writes. The `ack_timeout` setting places an upper bound on the amount of # time the call will block before failing. # # @raise [FailedToSendMessages] if not all messages could be successfully sent. # @return [nil] def send_messages