lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.alpha2 vs lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta1
- old
+ new
@@ -1,26 +1,67 @@
require "kafka/message"
require "kafka/message_set"
+require "kafka/partitioner"
module Kafka
class Producer
- # @param timeout [Integer] The number of milliseconds to wait for an
+ # @param timeout [Integer] The number of seconds to wait for an
# acknowledgement from the broker before timing out.
# @param required_acks [Integer] The number of replicas that must acknowledge
# a write.
- def initialize(broker_pool:, logger:, timeout: 10_000, required_acks: 1)
+ def initialize(broker_pool:, logger:, timeout: 10, required_acks: 1)
@broker_pool = broker_pool
@logger = logger
@required_acks = required_acks
@timeout = timeout
@buffered_messages = []
end
- def write(value, key:, topic:, partition:)
- @buffered_messages << Message.new(value, key: key, topic: topic, partition: partition)
+ # Writes a message to the specified topic. Note that messages are buffered in
+ # the producer until {#flush} is called.
+ #
+ # == Partitioning
+ #
+ # There are several options for specifying the partition that the message should
+ # be written to. The simplest option is to not specify a partition or partition
+ # key, in which case the message key will be used to select one of the available
+ # partitions. You can also specify the `partition` parameter yourself. This
+ # requires you to know which partitions are available, however. Oftentimes the
+ # best option is to specify the `partition_key` parameter: messages with the
+ # same partition key will always be assigned to the same partition, as long as
+ # the number of partitions doesn't change.
+ #
+ # @param value [String] the message data.
+ # @param key [String] the message key.
+ # @param topic [String] the topic that the message should be written to.
+ # @param partition [Integer] the partition that the message should be written to.
+ # @param partition_key [String] the key that should be used to assign a partition.
+ #
+ # @return [Message] the message that was written.
+ def write(value, key:, topic:, partition: nil, partition_key: nil)
+ if partition.nil?
+ # If no explicit partition key is specified we use the message key instead.
+ partition_key ||= key
+ partitioner = Partitioner.new(@broker_pool.partitions_for(topic))
+ partition = partitioner.partition_for_key(partition_key)
+ end
+
+ message = Message.new(value, key: key, topic: topic, partition: partition)
+
+ @buffered_messages << message
+
+ message
end
+ # Flushes all messages to the Kafka brokers.
+ #
+ # Depending on the value of `required_acks` used when initializing the producer,
+ # this call may block until the specified number of replicas have acknowledged
+ # the writes. The `timeout` setting places an upper bound on the amount of time
+ # the call will block before failing.
+ #
+ # @return [nil]
def flush
messages_for_broker = {}
@buffered_messages.each do |message|
broker = @broker_pool.get_leader(message.topic, message.partition)
@@ -35,11 +76,11 @@
message_set = MessageSet.new(messages)
response = broker.produce(
messages_for_topics: message_set.to_h,
required_acks: @required_acks,
- timeout: @timeout,
+ timeout: @timeout * 1000, # Kafka expects the timeout in milliseconds.
)
if response
response.topics.each do |topic_info|
topic_info.partitions.each do |partition_info|
@@ -48,8 +89,14 @@
end
end
end
@buffered_messages.clear
+
+ nil
+ end
+
+ def shutdown
+ @broker_pool.shutdown
end
end
end