lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.alpha vs lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.alpha2
- old
+ new
@@ -1,35 +1,55 @@
-require "kafka/protocol/message"
+require "kafka/message"
+require "kafka/message_set"
module Kafka
class Producer
# @param timeout [Integer] The number of milliseconds to wait for an
# acknowledgement from the broker before timing out.
# @param required_acks [Integer] The number of replicas that must acknowledge
# a write.
- def initialize(cluster:, logger:, timeout: 10_000, required_acks: 1)
- @cluster = cluster
+ def initialize(broker_pool:, logger:, timeout: 10_000, required_acks: 1)
+ @broker_pool = broker_pool
@logger = logger
@required_acks = required_acks
@timeout = timeout
- @buffer = {}
+ @buffered_messages = []
end
def write(value, key:, topic:, partition:)
- message = Protocol::Message.new(value: value, key: key)
-
- @buffer[topic] ||= {}
- @buffer[topic][partition] ||= []
- @buffer[topic][partition] << message
+ @buffered_messages << Message.new(value, key: key, topic: topic, partition: partition)
end
def flush
- @cluster.produce(
- required_acks: @required_acks,
- timeout: @timeout,
- messages_for_topics: @buffer
- )
+ messages_for_broker = {}
- @buffer = {}
+ @buffered_messages.each do |message|
+ broker = @broker_pool.get_leader(message.topic, message.partition)
+
+ messages_for_broker[broker] ||= []
+ messages_for_broker[broker] << message
+ end
+
+ messages_for_broker.each do |broker, messages|
+ @logger.info "Sending #{messages.count} messages to broker #{broker}"
+
+ message_set = MessageSet.new(messages)
+
+ response = broker.produce(
+ messages_for_topics: message_set.to_h,
+ required_acks: @required_acks,
+ timeout: @timeout,
+ )
+
+ if response
+ response.topics.each do |topic_info|
+ topic_info.partitions.each do |partition_info|
+ Protocol.handle_error(partition_info.error_code)
+ end
+ end
+ end
+ end
+
+ @buffered_messages.clear
end
end
end