lib/kafkr/producer.rb in kafkr-0.5.7 vs lib/kafkr/producer.rb in kafkr-0.9.1

- old
+ new

@@ -19,10 +19,11 @@ @configuration.acknowledged_file = ACKNOWLEDGED_MESSAGE_QUEUE @configuration.message_queue = [] @configuration.acknowledged_messages = [] @configuration.acknowledged_messages = load_acknowledged_messages load_queue_from_file + @configuration.is_json = false @configuration end def self.configure yield(configuration) @@ -70,18 +71,26 @@ end end def self.send_message(message, acknowledge: true) uuid = SecureRandom.uuid + + message_with_uuid = nil - if message.is_a? String - message = structured_data_to_hash(input: message, sync_uid: uuid) - message_with_uuid = "#{uuid}: #{message}" - end + if Kafkr::Producer.configuration.is_json + json_message = JSON.parse(message) + json_message["uuid"] = uuid + message_with_uuid = JSON.dump(json_message) + else + if message.is_a? String + message = structured_data_to_hash(input: message, sync_uid: uuid) + message_with_uuid = "#{uuid}: #{message}" + end - if message.is_a?(Hash) - message_with_uuid = "#{uuid}: #{JSON.generate(message)}" + if message.is_a?(Hash) + message_with_uuid = "#{uuid}: #{JSON.generate(message)}" + end end # Encrypt the message here encrypted_message_with_uuid = Kafkr::Encryptor.new.encrypt(message_with_uuid) @@ -114,20 +123,17 @@ uuid end def self.send_message_and_wait(message) # Using method(:send_message) to pass the send_message method as a callable object - - payload = Consumer.new.listen_for(message, self.method(:send_message)) do |received_message,sync_uid| + + Consumer.new.listen_for(message, method(:send_message)) do |received_message, sync_uid| if received_message.key? "reply" - if received_message["reply"].dig('uuid') == sync_uid - received_message["reply"].dig('payload') + if received_message["reply"].dig("uuid") == sync_uid + received_message["reply"].dig("payload") end end - end - - payload end private def self.listen_for_acknowledgments(socket)