lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta2 vs lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta3
- old
+ new
@@ -6,23 +6,23 @@
# Allows sending messages to a Kafka cluster.
#
# == Buffering
#
- # The producer buffers pending messages until {#flush} is called. Note that there is
+ # The producer buffers pending messages until {#send_messages} is called. Note that there is
# a maximum buffer size (default is 1,000 messages) and writing messages after the
# buffer has reached this size will result in a BufferOverflow exception. Make sure
- # to periodically call {#flush} or set +max_buffer_size+ to an appropriate value.
+ # to periodically call {#send_messages} or set +max_buffer_size+ to an appropriate value.
#
# Buffering messages and sending them in batches greatly improves performance, so
- # try to avoid flushing after every write. The tradeoff between throughput and
+ # try to avoid sending messages after every write. The tradeoff between throughput and
# message delays depends on your use case.
#
# == Error Handling and Retries
#
# The design of the error handling is based on having a {MessageBuffer} hold messages
- # for all topics/partitions. Whenever we want to flush messages to the cluster, we
+ # for all topics/partitions. Whenever we want to send messages to the cluster, we
# group the buffered messages by the broker they need to be sent to and fire off a
# request to each broker. A request can be a partial success, so we go through the
# response and inspect the error code for each partition that we wrote to. If the
# write to a given partition was successful, we clear the corresponding messages
# from the buffer -- otherwise, we log the error and keep the messages in the buffer.
@@ -37,11 +37,11 @@
#
# @param broker_pool [BrokerPool] the broker pool representing the cluster.
#
# @param logger [Logger]
#
- # @param timeout [Integer] The number of seconds a broker can wait for
+ # @param ack_timeout [Integer] The number of seconds a broker can wait for
# replicas to acknowledge a write before responding with a timeout.
#
# @param required_acks [Integer] The number of replicas that must acknowledge
# a write.
#
@@ -52,23 +52,23 @@
# @param retry_backoff [Integer] the number of seconds to wait between retries.
#
# @param max_buffer_size [Integer] the number of messages allowed in the buffer
# before new writes will raise BufferOverflow exceptions.
#
- def initialize(broker_pool:, logger:, timeout: 10, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000)
+ def initialize(broker_pool:, logger:, ack_timeout: 10, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000)
@broker_pool = broker_pool
@logger = logger
@required_acks = required_acks
- @timeout = timeout
+ @ack_timeout = ack_timeout
@max_retries = max_retries
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
@buffer = MessageBuffer.new
end
- # Writes a message to the specified topic. Note that messages are buffered in
- # the producer until {#flush} is called.
+ # Produces a message to the specified topic. Note that messages are buffered in
+ # the producer until {#send_messages} is called.
#
# == Partitioning
#
# There are several options for specifying the partition that the message should
# be written to.
@@ -92,11 +92,11 @@
# @param partition [Integer] the partition that the message should be written to.
# @param partition_key [String] the key that should be used to assign a partition.
#
# @raise [BufferOverflow] if the maximum buffer size has been reached.
# @return [nil]
- def write(value, key: nil, topic:, partition: nil, partition_key: nil)
+ def produce(value, key: nil, topic:, partition: nil, partition_key: nil)
unless buffer_size < @max_buffer_size
raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded"
end
if partition.nil?
@@ -111,24 +111,24 @@
@buffer.write(message, topic: topic, partition: partition)
partition
end
- # Flushes all messages to the Kafka brokers.
+ # Sends all buffered messages to the Kafka brokers.
#
# Depending on the value of +required_acks+ used when initializing the producer,
# this call may block until the specified number of replicas have acknowledged
- # the writes. The +timeout+ setting places an upper bound on the amount of time
- # the call will block before failing.
+ # the writes. The +ack_timeout+ setting places an upper bound on the amount of
+ # time the call will block before failing.
#
# @raise [FailedToSendMessages] if not all messages could be successfully sent.
# @return [nil]
- def flush
+ def send_messages
attempt = 0
loop do
- @logger.info "Flushing #{@buffer.size} messages"
+ @logger.info "Sending #{@buffer.size} messages"
attempt += 1
transmit_messages
if @buffer.empty?
@@ -164,10 +164,13 @@
# @return [Integer] buffer size.
def buffer_size
@buffer.size
end
+ # Closes all connections to the brokers.
+ #
+ # @return [nil]
def shutdown
@broker_pool.shutdown
end
private
@@ -189,11 +192,11 @@
broker = @broker_pool.get_broker(broker_id)
response = broker.produce(
messages_for_topics: message_set.to_h,
required_acks: @required_acks,
- timeout: @timeout * 1000, # Kafka expects the timeout in milliseconds.
+ timeout: @ack_timeout * 1000, # Kafka expects the timeout in milliseconds.
)
handle_response(response) if response
rescue ConnectionError => e
@logger.error "Could not connect to broker #{broker_id}: #{e}"
@@ -227,10 +230,10 @@
@logger.error "Not enough in-sync replicas for #{topic}/#{partition}"
rescue Kafka::NotEnoughReplicasAfterAppend
@logger.error "Messages written, but to fewer in-sync replicas than required for #{topic}/#{partition}"
else
offset = partition_info.offset
- @logger.info "Successfully flushed messages for #{topic}/#{partition}; new offset is #{offset}"
+ @logger.info "Successfully sent messages for #{topic}/#{partition}; new offset is #{offset}"
# The messages were successfully written; clear them from the buffer.
@buffer.clear_messages(topic: topic, partition: partition)
end
end