lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta3 vs lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta4
- old
+ new
@@ -4,22 +4,22 @@
module Kafka
# Allows sending messages to a Kafka cluster.
#
- # == Buffering
+ # ## Buffering
#
# The producer buffers pending messages until {#send_messages} is called. Note that there is
# a maximum buffer size (default is 1,000 messages) and writing messages after the
# buffer has reached this size will result in a BufferOverflow exception. Make sure
- # to periodically call {#send_messages} or set +max_buffer_size+ to an appropriate value.
+ # to periodically call {#send_messages} or set `max_buffer_size` to an appropriate value.
#
# Buffering messages and sending them in batches greatly improves performance, so
# try to avoid sending messages after every write. The tradeoff between throughput and
# message delays depends on your use case.
#
- # == Error Handling and Retries
+ # ## Error Handling and Retries
#
# The design of the error handling is based on having a {MessageBuffer} hold messages
# for all topics/partitions. Whenever we want to send messages to the cluster, we
# group the buffered messages by the broker they need to be sent to and fire off a
# request to each broker. A request can be a partial success, so we go through the
@@ -27,12 +27,48 @@
# write to a given partition was successful, we clear the corresponding messages
# from the buffer -- otherwise, we log the error and keep the messages in the buffer.
#
# After this, we check if the buffer is empty. If it is, we're all done. If it's
# not, we do another round of requests, this time with just the remaining messages.
- # We do this for as long as +max_retries+ permits.
+ # We do this for as long as `max_retries` permits.
#
+ # ## Example
+ #
+ # This is an example of an application which reads lines from stdin and writes them
+ # to Kafka:
+ #
+ # require "kafka"
+ #
+ # logger = Logger.new($stderr)
+ # brokers = ENV.fetch("KAFKA_BROKERS").split(",")
+ #
+ # # Make sure to create this topic in your Kafka cluster or configure the
+ # # cluster to auto-create topics.
+ # topic = "random-messages"
+ #
+ # kafka = Kafka.new(
+ # seed_brokers: brokers,
+ # client_id: "simple-producer",
+ # logger: logger,
+ # )
+ #
+ # producer = kafka.get_producer
+ #
+ # begin
+ # $stdin.each_with_index do |line, index|
+ # producer.produce(line, topic: topic)
+ #
+ # # Send messages for every 10 lines.
+ # producer.send_messages if index % 10 == 0
+ # end
+ # ensure
+ # # Make sure to send any remaining messages.
+ # producer.send_messages
+ #
+ # producer.shutdown
+ # end
+ #
class Producer
# Initializes a new Producer.
#
# @param broker_pool [BrokerPool] the broker pool representing the cluster.
@@ -66,22 +102,22 @@
end
# Produces a message to the specified topic. Note that messages are buffered in
# the producer until {#send_messages} is called.
#
- # == Partitioning
+ # ## Partitioning
#
# There are several options for specifying the partition that the message should
# be written to.
#
# The simplest option is to not specify a message key, partition key, or
# partition number, in which case the message will be assigned a partition at
# random.
#
- # You can also specify the +partition+ parameter yourself. This requires you to
+ # You can also specify the `partition` parameter yourself. This requires you to
# know which partitions are available, however. Oftentimes the best option is
- # to specify the +partition_key+ parameter: messages with the same partition
+ # to specify the `partition_key` parameter: messages with the same partition
# key will always be assigned to the same partition, as long as the number of
# partitions doesn't change. You can also omit the partition key and specify
# a message key instead. The message key is part of the message payload, and
# so can carry semantic value--whether you want to have the message key double
# as a partition key is up to you.
@@ -113,12 +149,12 @@
partition
end
# Sends all buffered messages to the Kafka brokers.
#
- # Depending on the value of +required_acks+ used when initializing the producer,
+ # Depending on the value of `required_acks` used when initializing the producer,
# this call may block until the specified number of replicas have acknowledged
- # the writes. The +ack_timeout+ setting places an upper bound on the amount of
+ # the writes. The `ack_timeout` setting places an upper bound on the amount of
# time the call will block before failing.
#
# @raise [FailedToSendMessages] if not all messages could be successfully sent.
# @return [nil]
def send_messages