lib/kafkr/producer.rb in kafkr-0.018.1 vs lib/kafkr/producer.rb in kafkr-0.21.0
- old
+ new
@@ -15,69 +15,38 @@
FileUtils.mkdir_p "./.kafkr"
@configuration ||= OpenStruct.new
@configuration.queue_file = MESSAGE_QUEUE
@configuration.message_queue = []
load_queue_from_file
- @configuration.is_json = false
+ @configuration.is_json = true
@configuration
end
def self.configure
yield(configuration)
rescue => e
logger.error("Configuration error: #{e.message}")
end
- def self.structured_data_to_hash(input:, sync_uid:)
- unless /\A\w+\s*(=>|<=>)\s*((\w+:\s*['"]?[^'",]*['"]?,\s*)*(\w+:\s*['"]?[^'",]*['"]?)\s*)\z/.match?(input)
- return input
- end
-
- if input.include?("<=>")
- type, key_values_str = input.split("<=>").map(&:strip)
- key_values = key_values_str.scan(/(\w+):\s*['"]?([^'",]*)['"]?/)
- hash_body = key_values.to_h do |key, value|
- [key.to_sym, value.strip.gsub(/\A['"]|['"]\z/, "")]
- end
- {type.to_sym => hash_body, :sync => true, :sync_uid => sync_uid}
- else
- type, key_values_str = input.split("=>").map(&:strip)
- key_values = key_values_str.scan(/(\w+):\s*['"]?([^'",]*)['"]?/)
- hash_body = key_values.to_h do |key, value|
- [key.to_sym, value.strip.gsub(/\A['"]|['"]\z/, "")]
- end
- {type.to_sym => hash_body}
- end
- end
-
def self.send_message(message)
return if message.nil? || message.empty?
uuid = SecureRandom.uuid
message_with_uuid = nil
if Kafkr::Producer.configuration.is_json
json_message = JSON.parse(message)
json_message["uuid"] = uuid
message_with_uuid = JSON.dump(json_message)
- else
- if message.is_a? String
- message = structured_data_to_hash(input: message, sync_uid: uuid)
- message_with_uuid = "#{uuid}: #{message}"
- end
-
- if message.is_a?(Hash)
- message_with_uuid = "#{uuid}: #{JSON.generate(message)}"
- end
end
encrypted_message_with_uuid = Kafkr::Encryptor.new.encrypt(message_with_uuid)
begin
socket = TCPSocket.new(@configuration.host, @configuration.port)
send_queued_messages(socket)
- socket.Kafkr.log(encrypted_message_with_uuid)
+ socket.puts (encrypted_message_with_uuid)
rescue Errno::ECONNREFUSED
Kafkr.log "Connection refused. Queuing message: #{encrypted_message_with_uuid}"
@configuration.message_queue.push(encrypted_message_with_uuid)
save_queue_to_file
rescue Errno::EPIPE
@@ -104,18 +73,18 @@
end
def self.send_queued_messages(socket)
until @configuration.message_queue.empty?
queued_message = @configuration.message_queue.shift
- socket.Kafkr.log(queued_message)
+ socket.puts(queued_message)
end
end
def self.save_queue_to_file
@@file_mutex.synchronize do
File.open(@configuration.queue_file, "w") do |file|
- file.Kafkr.log(@configuration.message_queue)
+ file.puts(@configuration.message_queue)
end
end
end
def self.load_queue_from_file
@@ -124,10 +93,7 @@
@configuration.message_queue = File.readlines(@configuration.queue_file).map(&:chomp)
end
end
end
- def self.logger
- @logger ||= Logger.new(STDOUT)
- end
end
end