Sha256: 441596e87035a035f37f8e072dca2bb277cf83b12a2f44ed5a6211c6cdd01f6b

Contents?: true

Size: 1.87 KB

Versions: 2

Compression:

Stored size: 1.87 KB

Contents

module Phobos
  module Actions
    class ProcessMessage
      include Phobos::Instrumentation

      attr_reader :metadata

      def initialize(listener:, message:, listener_metadata:)
        @listener = listener
        @message = message
        @metadata = listener_metadata.merge(
          key: message.key,
          partition: message.partition,
          offset: message.offset,
          retry_count: 0
        )
      end

      def execute
        backoff = @listener.create_exponential_backoff
        payload = force_encoding(@message.value)

        begin
          process_message(payload)
        rescue => e
          retry_count = @metadata[:retry_count]
          interval = backoff.interval_at(retry_count).round(2)

          error = {
            waiting_time: interval,
            exception_class: e.class.name,
            exception_message: e.message,
            backtrace: e.backtrace
          }

          instrument('listener.retry_handler_error', error.merge(@metadata)) do
            Phobos.logger.error do
              { message: "error processing message, waiting #{interval}s" }.merge(error).merge(@metadata)
            end

            sleep interval
          end

          raise Phobos::AbortError if @listener.should_stop?

          @metadata.merge!(retry_count: retry_count + 1)
          retry
        end
      end

      private

      def force_encoding(value)
        @listener.encoding ? value&.force_encoding(@listener.encoding) : value
      end

      def process_message(payload)
        instrument('listener.process_message', @metadata) do
          handler = @listener.handler_class.new
          preprocessed_payload = handler.before_consume(payload)

          @listener.handler_class.around_consume(preprocessed_payload, @metadata) do
            handler.consume(preprocessed_payload, @metadata)
          end
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
phobos-1.7.2 lib/phobos/actions/process_message.rb
phobos-1.7.1 lib/phobos/actions/process_message.rb