lib/kafkr/producer.rb in kafkr-0.9.1 vs lib/kafkr/producer.rb in kafkr-0.10.0
- old
+ new
@@ -8,20 +8,16 @@
module Kafkr
module Producer
@@file_mutex = Mutex.new
MESSAGE_QUEUE = "./.kafkr/message_queue.txt"
- ACKNOWLEDGED_MESSAGE_QUEUE = "./.kafkr/acknowledged_messages.txt"
def self.configuration
FileUtils.mkdir_p "./.kafkr"
@configuration ||= OpenStruct.new
@configuration.queue_file = MESSAGE_QUEUE
- @configuration.acknowledged_file = ACKNOWLEDGED_MESSAGE_QUEUE
@configuration.message_queue = []
- @configuration.acknowledged_messages = []
- @configuration.acknowledged_messages = load_acknowledged_messages
load_queue_from_file
@configuration.is_json = false
@configuration
end
@@ -30,52 +26,33 @@
rescue => e
logger.error("Configuration error: #{e.message}")
end
def self.structured_data_to_hash(input:, sync_uid:)
- # Check the overall structure with regex and make quotes optional
unless /\A\w+\s*(=>|<=>)\s*((\w+:\s*['"]?[^'",]*['"]?,\s*)*(\w+:\s*['"]?[^'",]*['"]?)\s*)\z/.match?(input)
return input
end
if input.include?("<=>")
- # puts "sync message"
- # Extract the type and key-value pairs
type, key_values_str = input.split("<=>").map(&:strip)
-
- # puts type
- # puts key_values_str
-
key_values = key_values_str.scan(/(\w+):\s*['"]?([^'",]*)['"]?/)
-
- # Convert the array of pairs into a hash, stripping quotes if they exist
hash_body = key_values.to_h do |key, value|
[key.to_sym, value.strip.gsub(/\A['"]|['"]\z/, "")]
end
-
- # Return the final hash with the type as the key
{type.to_sym => hash_body, :sync => true, :sync_uid => sync_uid}
-
else
- # puts "async message"
- # Extract the type and key-value pairs
type, key_values_str = input.split("=>").map(&:strip)
key_values = key_values_str.scan(/(\w+):\s*['"]?([^'",]*)['"]?/)
-
- # Convert the array of pairs into a hash, stripping quotes if they exist
hash_body = key_values.to_h do |key, value|
[key.to_sym, value.strip.gsub(/\A['"]|['"]\z/, "")]
end
-
- # Return the final hash with the type as the key
{type.to_sym => hash_body}
end
end
- def self.send_message(message, acknowledge: true)
+ def self.send_message(message)
uuid = SecureRandom.uuid
-
message_with_uuid = nil
if Kafkr::Producer.configuration.is_json
json_message = JSON.parse(message)
json_message["uuid"] = uuid
@@ -89,32 +66,18 @@
if message.is_a?(Hash)
message_with_uuid = "#{uuid}: #{JSON.generate(message)}"
end
end
- # Encrypt the message here
encrypted_message_with_uuid = Kafkr::Encryptor.new.encrypt(message_with_uuid)
begin
- if acknowledge
- if !@configuration.acknowledged_messages.include?(uuid)
- socket = TCPSocket.new(@configuration.host, @configuration.port)
- listen_for_acknowledgments(socket) if acknowledge
- send_queued_messages(socket)
- # Send the encrypted message instead of the plain one
- socket.puts(encrypted_message_with_uuid)
- else
- puts "Message with UUID #{uuid} has already been acknowledged. Skipping."
- end
- else
- socket = TCPSocket.new(@configuration.host, @configuration.port)
- send_queued_messages(socket)
- socket.puts(encrypted_message_with_uuid)
- end
+ socket = TCPSocket.new(@configuration.host, @configuration.port)
+ send_queued_messages(socket)
+ socket.puts(encrypted_message_with_uuid)
rescue Errno::ECONNREFUSED
puts "Connection refused. Queuing message: #{encrypted_message_with_uuid}"
- # Queue the encrypted message
@configuration.message_queue.push(encrypted_message_with_uuid)
save_queue_to_file
rescue Errno::EPIPE
puts "Broken pipe error. Retrying connection..."
retry_connection(encrypted_message_with_uuid)
@@ -122,40 +85,19 @@
uuid
end
def self.send_message_and_wait(message)
- # Using method(:send_message) to pass the send_message method as a callable object
-
Consumer.new.listen_for(message, method(:send_message)) do |received_message, sync_uid|
- if received_message.key? "reply"
- if received_message["reply"].dig("uuid") == sync_uid
- received_message["reply"].dig("payload")
- end
+ if received_message.key? "reply" and received_message["reply"].dig("uuid") == sync_uid
+ received_message["reply"].dig("payload")
end
end
end
private
- def self.listen_for_acknowledgments(socket)
- Thread.new do
- while line = socket.gets
- line = line.chomp
- if line.start_with?("ACK:")
- uuid = line.split(" ")[1]
- handle_acknowledgment(uuid)
- end
- end
- end
- end
-
- def self.handle_acknowledgment(uuid)
- @configuration.acknowledged_messages << uuid
- save_acknowledged_messages
- end
-
def self.retry_connection(message_with_uuid)
sleep(5)
send_message(message_with_uuid)
end
@@ -176,27 +118,9 @@
def self.load_queue_from_file
@@file_mutex.synchronize do
if File.exist?(@configuration.queue_file)
@configuration.message_queue = File.readlines(@configuration.queue_file).map(&:chomp)
- end
- end
- end
-
- def self.load_acknowledged_messages
- @@file_mutex.synchronize do
- if File.exist?(@configuration.acknowledged_file)
- File.readlines(@configuration.acknowledged_file).map(&:chomp)
- else
- []
- end
- end
- end
-
- def self.save_acknowledged_messages
- @@file_mutex.synchronize do
- File.open(@configuration.acknowledged_file, "w") do |file|
- file.puts(@configuration.acknowledged_messages)
end
end
end
def self.logger