lib/kafkr/log.rb in kafkr-0.9.1 vs lib/kafkr/log.rb in kafkr-0.10.0

- old
+ new

@@ -6,26 +6,21 @@ def initialize(port) @server = TCPServer.new(port) @received_file = "./.kafkr/log.txt" @broker = MessageBroker.new @whitelist = load_whitelist - @acknowledged_message_ids = load_acknowledged_message_ids end - def load_acknowledged_message_ids - unless File.exist?("./.kafkr/acknowledged_message_ids.txt") - `mkdir -p ./.kafkr` - `touch ./.kafkr/acknowledged_message_ids.txt` + def load_whitelist + whitelist = ["localhost", "::1", "127.0.0.1"] + if File.exist?("whitelist.txt") + File.readlines("whitelist.txt").each do |line| + ip = line.strip.sub(/^::ffff:/, "") + whitelist << ip + end end - - config_path = File.expand_path("./.kafkr/acknowledged_message_ids.txt") - return [] unless File.exist?(config_path) - - File.readlines(config_path).map(&:strip) - rescue Errno::ENOENT, Errno::EACCES => e - puts "Error loading acknowledged_message_ids: #{e.message}" - [] + whitelist end def start loop do client = @server.accept @@ -51,18 +46,11 @@ else decryptor = Kafkr::Encryptor.new message = decryptor.decrypt(encrypted_message.chomp) # Decrypt the message here uuid, message_content = extract_uuid(message) if uuid && message_content - if @acknowledged_message_ids.include?(uuid) - acknowledge_existing_message(uuid, client) - else - acknowledge_message(uuid, client) - persist_received_message(uuid) - @acknowledged_message_ids << uuid - @broker.broadcast(message_content) - end + @broker.broadcast(message_content) else puts "Received invalid message format: #{message}" end end rescue Errno::ECONNRESET @@ -71,59 +59,24 @@ end end end end - def load_whitelist - whitelist = ["localhost", "::1", "127.0.0.1"] - if File.exist?("whitelist.txt") - File.readlines("whitelist.txt").each do |line| - ip = line.strip.sub(/^::ffff:/, "") - whitelist << ip - end - end - whitelist - end - def whitelisted?(ip) @whitelist.include?(ip.gsub("::ffff:", "")) end private def extract_uuid(message) - - #check if message if valid json + # Check if message is valid JSON begin message = JSON.parse(message) - return message["uuid"], message - rescue JSON::ParserError => e puts "Received invalid message format: #{message}" match_data = /^(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}): (.+)$/.match(message) match_data ? [match_data[1], match_data[2]] : [nil, nil] - end - - end - - def acknowledge_message(uuid, client) - puts "Received message with UUID #{uuid}. Acknowledged." - acknowledgment_message = "ACK: #{uuid}" - client.puts(acknowledgment_message) - puts "Acknowledgment sent to producer: #{acknowledgment_message}" - end - - def acknowledge_existing_message(uuid, client) - puts "Received duplicate message with UUID #{uuid}. Already Acknowledged." - acknowledgment_message = "ACK-DUPLICATE: #{uuid}" - client.puts(acknowledgment_message) - puts "Duplicate acknowledgment sent to producer: #{acknowledgment_message}" - end - - def persist_received_message(uuid) - File.open("./.kafkr/acknowledged_message_ids.txt", "a") do |file| - file.puts(uuid) end end end end