lib/kafkr/producer.rb in kafkr-0.18.0 vs lib/kafkr/producer.rb in kafkr-0.018.1
- old
+ new
@@ -73,17 +73,17 @@
encrypted_message_with_uuid = Kafkr::Encryptor.new.encrypt(message_with_uuid)
begin
socket = TCPSocket.new(@configuration.host, @configuration.port)
send_queued_messages(socket)
- socket.puts(encrypted_message_with_uuid)
+ socket.Kafkr.log(encrypted_message_with_uuid)
rescue Errno::ECONNREFUSED
- puts "Connection refused. Queuing message: #{encrypted_message_with_uuid}"
+ Kafkr.log "Connection refused. Queuing message: #{encrypted_message_with_uuid}"
@configuration.message_queue.push(encrypted_message_with_uuid)
save_queue_to_file
rescue Errno::EPIPE
- puts "Broken pipe error. Retrying connection..."
+ Kafkr.log "Broken pipe error. Retrying connection..."
retry_connection(encrypted_message_with_uuid)
end
uuid
end
@@ -104,17 +104,17 @@
end
def self.send_queued_messages(socket)
until @configuration.message_queue.empty?
queued_message = @configuration.message_queue.shift
- socket.puts(queued_message)
+ socket.Kafkr.log(queued_message)
end
end
def self.save_queue_to_file
@@file_mutex.synchronize do
File.open(@configuration.queue_file, "w") do |file|
- file.puts(@configuration.message_queue)
+ file.Kafkr.log(@configuration.message_queue)
end
end
end
def self.load_queue_from_file