lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.alpha vs lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.alpha2

- old
+ new

@@ -1,35 +1,55 @@ -require "kafka/protocol/message" +require "kafka/message" +require "kafka/message_set" module Kafka class Producer # @param timeout [Integer] The number of milliseconds to wait for an # acknowledgement from the broker before timing out. # @param required_acks [Integer] The number of replicas that must acknowledge # a write. - def initialize(cluster:, logger:, timeout: 10_000, required_acks: 1) - @cluster = cluster + def initialize(broker_pool:, logger:, timeout: 10_000, required_acks: 1) + @broker_pool = broker_pool @logger = logger @required_acks = required_acks @timeout = timeout - @buffer = {} + @buffered_messages = [] end def write(value, key:, topic:, partition:) - message = Protocol::Message.new(value: value, key: key) - - @buffer[topic] ||= {} - @buffer[topic][partition] ||= [] - @buffer[topic][partition] << message + @buffered_messages << Message.new(value, key: key, topic: topic, partition: partition) end def flush - @cluster.produce( - required_acks: @required_acks, - timeout: @timeout, - messages_for_topics: @buffer - ) + messages_for_broker = {} - @buffer = {} + @buffered_messages.each do |message| + broker = @broker_pool.get_leader(message.topic, message.partition) + + messages_for_broker[broker] ||= [] + messages_for_broker[broker] << message + end + + messages_for_broker.each do |broker, messages| + @logger.info "Sending #{messages.count} messages to broker #{broker}" + + message_set = MessageSet.new(messages) + + response = broker.produce( + messages_for_topics: message_set.to_h, + required_acks: @required_acks, + timeout: @timeout, + ) + + if response + response.topics.each do |topic_info| + topic_info.partitions.each do |partition_info| + Protocol.handle_error(partition_info.error_code) + end + end + end + end + + @buffered_messages.clear end end end