Sha256: 441596e87035a035f37f8e072dca2bb277cf83b12a2f44ed5a6211c6cdd01f6b
Contents?: true
Size: 1.87 KB
Versions: 2
Compression:
Stored size: 1.87 KB
Contents
module Phobos module Actions class ProcessMessage include Phobos::Instrumentation attr_reader :metadata def initialize(listener:, message:, listener_metadata:) @listener = listener @message = message @metadata = listener_metadata.merge( key: message.key, partition: message.partition, offset: message.offset, retry_count: 0 ) end def execute backoff = @listener.create_exponential_backoff payload = force_encoding(@message.value) begin process_message(payload) rescue => e retry_count = @metadata[:retry_count] interval = backoff.interval_at(retry_count).round(2) error = { waiting_time: interval, exception_class: e.class.name, exception_message: e.message, backtrace: e.backtrace } instrument('listener.retry_handler_error', error.merge(@metadata)) do Phobos.logger.error do { message: "error processing message, waiting #{interval}s" }.merge(error).merge(@metadata) end sleep interval end raise Phobos::AbortError if @listener.should_stop? @metadata.merge!(retry_count: retry_count + 1) retry end end private def force_encoding(value) @listener.encoding ? value&.force_encoding(@listener.encoding) : value end def process_message(payload) instrument('listener.process_message', @metadata) do handler = @listener.handler_class.new preprocessed_payload = handler.before_consume(payload) @listener.handler_class.around_consume(preprocessed_payload, @metadata) do handler.consume(preprocessed_payload, @metadata) end end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
phobos-1.7.2 | lib/phobos/actions/process_message.rb |
phobos-1.7.1 | lib/phobos/actions/process_message.rb |