lib/kafkr/producer.rb in kafkr-0.018.1 vs lib/kafkr/producer.rb in kafkr-0.21.0

- old
+ new

@@ -15,69 +15,38 @@ FileUtils.mkdir_p "./.kafkr" @configuration ||= OpenStruct.new @configuration.queue_file = MESSAGE_QUEUE @configuration.message_queue = [] load_queue_from_file - @configuration.is_json = false + @configuration.is_json = true @configuration end def self.configure yield(configuration) rescue => e logger.error("Configuration error: #{e.message}") end - def self.structured_data_to_hash(input:, sync_uid:) - unless /\A\w+\s*(=>|<=>)\s*((\w+:\s*['"]?[^'",]*['"]?,\s*)*(\w+:\s*['"]?[^'",]*['"]?)\s*)\z/.match?(input) - return input - end - - if input.include?("<=>") - type, key_values_str = input.split("<=>").map(&:strip) - key_values = key_values_str.scan(/(\w+):\s*['"]?([^'",]*)['"]?/) - hash_body = key_values.to_h do |key, value| - [key.to_sym, value.strip.gsub(/\A['"]|['"]\z/, "")] - end - {type.to_sym => hash_body, :sync => true, :sync_uid => sync_uid} - else - type, key_values_str = input.split("=>").map(&:strip) - key_values = key_values_str.scan(/(\w+):\s*['"]?([^'",]*)['"]?/) - hash_body = key_values.to_h do |key, value| - [key.to_sym, value.strip.gsub(/\A['"]|['"]\z/, "")] - end - {type.to_sym => hash_body} - end - end - def self.send_message(message) return if message.nil? || message.empty? uuid = SecureRandom.uuid message_with_uuid = nil if Kafkr::Producer.configuration.is_json json_message = JSON.parse(message) json_message["uuid"] = uuid message_with_uuid = JSON.dump(json_message) - else - if message.is_a? String - message = structured_data_to_hash(input: message, sync_uid: uuid) - message_with_uuid = "#{uuid}: #{message}" - end - - if message.is_a?(Hash) - message_with_uuid = "#{uuid}: #{JSON.generate(message)}" - end end encrypted_message_with_uuid = Kafkr::Encryptor.new.encrypt(message_with_uuid) begin socket = TCPSocket.new(@configuration.host, @configuration.port) send_queued_messages(socket) - socket.Kafkr.log(encrypted_message_with_uuid) + socket.puts (encrypted_message_with_uuid) rescue Errno::ECONNREFUSED Kafkr.log "Connection refused. Queuing message: #{encrypted_message_with_uuid}" @configuration.message_queue.push(encrypted_message_with_uuid) save_queue_to_file rescue Errno::EPIPE @@ -104,18 +73,18 @@ end def self.send_queued_messages(socket) until @configuration.message_queue.empty? queued_message = @configuration.message_queue.shift - socket.Kafkr.log(queued_message) + socket.puts(queued_message) end end def self.save_queue_to_file @@file_mutex.synchronize do File.open(@configuration.queue_file, "w") do |file| - file.Kafkr.log(@configuration.message_queue) + file.puts(@configuration.message_queue) end end end def self.load_queue_from_file @@ -124,10 +93,7 @@ @configuration.message_queue = File.readlines(@configuration.queue_file).map(&:chomp) end end end - def self.logger - @logger ||= Logger.new(STDOUT) - end end end