lib/kafkr/producer.rb in kafkr-0.5.7 vs lib/kafkr/producer.rb in kafkr-0.9.1
- old
+ new
@@ -19,10 +19,11 @@
@configuration.acknowledged_file = ACKNOWLEDGED_MESSAGE_QUEUE
@configuration.message_queue = []
@configuration.acknowledged_messages = []
@configuration.acknowledged_messages = load_acknowledged_messages
load_queue_from_file
+ @configuration.is_json = false
@configuration
end
def self.configure
yield(configuration)
@@ -70,18 +71,26 @@
end
end
def self.send_message(message, acknowledge: true)
uuid = SecureRandom.uuid
+
+ message_with_uuid = nil
- if message.is_a? String
- message = structured_data_to_hash(input: message, sync_uid: uuid)
- message_with_uuid = "#{uuid}: #{message}"
- end
+ if Kafkr::Producer.configuration.is_json
+ json_message = JSON.parse(message)
+ json_message["uuid"] = uuid
+ message_with_uuid = JSON.dump(json_message)
+ else
+ if message.is_a? String
+ message = structured_data_to_hash(input: message, sync_uid: uuid)
+ message_with_uuid = "#{uuid}: #{message}"
+ end
- if message.is_a?(Hash)
- message_with_uuid = "#{uuid}: #{JSON.generate(message)}"
+ if message.is_a?(Hash)
+ message_with_uuid = "#{uuid}: #{JSON.generate(message)}"
+ end
end
# Encrypt the message here
encrypted_message_with_uuid = Kafkr::Encryptor.new.encrypt(message_with_uuid)
@@ -114,20 +123,17 @@
uuid
end
def self.send_message_and_wait(message)
# Using method(:send_message) to pass the send_message method as a callable object
-
- payload = Consumer.new.listen_for(message, self.method(:send_message)) do |received_message,sync_uid|
+
+ Consumer.new.listen_for(message, method(:send_message)) do |received_message, sync_uid|
if received_message.key? "reply"
- if received_message["reply"].dig('uuid') == sync_uid
- received_message["reply"].dig('payload')
+ if received_message["reply"].dig("uuid") == sync_uid
+ received_message["reply"].dig("payload")
end
end
-
end
-
- payload
end
private
def self.listen_for_acknowledgments(socket)