lib/kafkr/consumer.rb in kafkr-0.13.0 vs lib/kafkr/consumer.rb in kafkr-0.18.0
- old
+ new
@@ -1,6 +1,6 @@
-require "socket"
+require "socket"
require "timeout"
require "ostruct"
require "fileutils"
require "json"
@@ -17,12 +17,12 @@
def configuration
FileUtils.mkdir_p "./.kafkr"
@configuration ||= OpenStruct.new
@configuration.host = ENV.fetch("KAFKR_HOST", "localhost")
- @configuration.port = ENV.fetch("KAFKR_PORT", 2000)
- @configuration.timeout = ENV.fetch("KAFKR_CONSUMER_TIMEOUT", 120)
+ @configuration.port = ENV.fetch("KAFKR_PORT", 4000)
+ @configuration.timeout = ENV.fetch("KAFKR_CONSUMER_TIMEOUT", 300)
@configuration.suggest_handlers = false
@configuration
end
def configure
@@ -158,20 +158,17 @@
loop do
received_message = socket.gets
raise LostConnection if received_message.nil?
received_message = Kafkr::Encryptor.new.decrypt(received_message.chomp)
- if valid_json?(received_message)
- payload = yield JSON.parse(received_message), sync_uid if block_given?
- return payload if payload
- end
+ payload = yield received_message, sync_uid if block_given?
+ return payload
end
end
rescue Timeout::Error, LostConnection, Errno::ECONNREFUSED
attempt += 1
wait_time = backoff_time(attempt)
- puts "Attempt #{attempt}: Retrying in #{wait_time} seconds..."
sleep(wait_time)
retry
rescue Interrupt
puts "Received interrupt signal. Shutting down consumer gracefully..."
socket&.close
@@ -180,11 +177,12 @@
end
def listen
attempt = 0
loop do
- listen_for("dummy", ->(msg) { puts "Listening..." }) do |message|
- puts "Received message: #{message}"
+ listen_for("dummy", ->(msg) { }) do |message|
+ Kafkr.log ">> #{message}"
+ dispatch_to_handlers(message)
end
end
end
def valid_json?(json)