lib/kafkr/consumer.rb in kafkr-0.13.0 vs lib/kafkr/consumer.rb in kafkr-0.18.0

- old
+ new

@@ -1,6 +1,6 @@ -require "socket" +require "socket" require "timeout" require "ostruct" require "fileutils" require "json" @@ -17,12 +17,12 @@ def configuration FileUtils.mkdir_p "./.kafkr" @configuration ||= OpenStruct.new @configuration.host = ENV.fetch("KAFKR_HOST", "localhost") - @configuration.port = ENV.fetch("KAFKR_PORT", 2000) - @configuration.timeout = ENV.fetch("KAFKR_CONSUMER_TIMEOUT", 120) + @configuration.port = ENV.fetch("KAFKR_PORT", 4000) + @configuration.timeout = ENV.fetch("KAFKR_CONSUMER_TIMEOUT", 300) @configuration.suggest_handlers = false @configuration end def configure @@ -158,20 +158,17 @@ loop do received_message = socket.gets raise LostConnection if received_message.nil? received_message = Kafkr::Encryptor.new.decrypt(received_message.chomp) - if valid_json?(received_message) - payload = yield JSON.parse(received_message), sync_uid if block_given? - return payload if payload - end + payload = yield received_message, sync_uid if block_given? + return payload end end rescue Timeout::Error, LostConnection, Errno::ECONNREFUSED attempt += 1 wait_time = backoff_time(attempt) - puts "Attempt #{attempt}: Retrying in #{wait_time} seconds..." sleep(wait_time) retry rescue Interrupt puts "Received interrupt signal. Shutting down consumer gracefully..." socket&.close @@ -180,11 +177,12 @@ end def listen attempt = 0 loop do - listen_for("dummy", ->(msg) { puts "Listening..." }) do |message| - puts "Received message: #{message}" + listen_for("dummy", ->(msg) { }) do |message| + Kafkr.log ">> #{message}" + dispatch_to_handlers(message) end end end def valid_json?(json)