lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta4 vs lib/kafka/producer.rb in ruby-kafka-0.1.0.pre.beta5
- old
+ new
@@ -1,8 +1,9 @@
require "kafka/partitioner"
require "kafka/message_buffer"
require "kafka/protocol/message"
+require "kafka/transmission"
module Kafka
# Allows sending messages to a Kafka cluster.
#
@@ -159,15 +160,23 @@
# @raise [FailedToSendMessages] if not all messages could be successfully sent.
# @return [nil]
def send_messages
attempt = 0
+ transmission = Transmission.new(
+ broker_pool: @broker_pool,
+ buffer: @buffer,
+ required_acks: @required_acks,
+ ack_timeout: @ack_timeout,
+ logger: @logger,
+ )
+
loop do
@logger.info "Sending #{@buffer.size} messages"
attempt += 1
- transmit_messages
+ transmission.send_messages
if @buffer.empty?
@logger.info "Successfully transmitted all messages"
break
elsif attempt <= @max_retries
@@ -205,75 +214,8 @@
# Closes all connections to the brokers.
#
# @return [nil]
def shutdown
@broker_pool.shutdown
- end
-
- private
-
- def transmit_messages
- messages_for_broker = {}
-
- @buffer.each do |topic, partition, messages|
- broker_id = @broker_pool.get_leader_id(topic, partition)
-
- @logger.debug "Current leader for #{topic}/#{partition} is node #{broker_id}"
-
- messages_for_broker[broker_id] ||= MessageBuffer.new
- messages_for_broker[broker_id].concat(messages, topic: topic, partition: partition)
- end
-
- messages_for_broker.each do |broker_id, message_set|
- begin
- broker = @broker_pool.get_broker(broker_id)
-
- response = broker.produce(
- messages_for_topics: message_set.to_h,
- required_acks: @required_acks,
- timeout: @ack_timeout * 1000, # Kafka expects the timeout in milliseconds.
- )
-
- handle_response(response) if response
- rescue ConnectionError => e
- @logger.error "Could not connect to broker #{broker_id}: #{e}"
-
- # Mark the broker pool as stale in order to force a cluster metadata refresh.
- @broker_pool.mark_as_stale!
- end
- end
- end
-
- def handle_response(response)
- response.each_partition do |topic_info, partition_info|
- topic = topic_info.topic
- partition = partition_info.partition
-
- begin
- Protocol.handle_error(partition_info.error_code)
- rescue Kafka::CorruptMessage
- @logger.error "Corrupt message when writing to #{topic}/#{partition}"
- rescue Kafka::UnknownTopicOrPartition
- @logger.error "Unknown topic or partition #{topic}/#{partition}"
- rescue Kafka::LeaderNotAvailable
- @logger.error "Leader currently not available for #{topic}/#{partition}"
- @broker_pool.mark_as_stale!
- rescue Kafka::NotLeaderForPartition
- @logger.error "Broker not currently leader for #{topic}/#{partition}"
- @broker_pool.mark_as_stale!
- rescue Kafka::RequestTimedOut
- @logger.error "Timed out while writing to #{topic}/#{partition}"
- rescue Kafka::NotEnoughReplicas
- @logger.error "Not enough in-sync replicas for #{topic}/#{partition}"
- rescue Kafka::NotEnoughReplicasAfterAppend
- @logger.error "Messages written, but to fewer in-sync replicas than required for #{topic}/#{partition}"
- else
- offset = partition_info.offset
- @logger.info "Successfully sent messages for #{topic}/#{partition}; new offset is #{offset}"
-
- # The messages were successfully written; clear them from the buffer.
- @buffer.clear_messages(topic: topic, partition: partition)
- end
- end
end
end
end