Sha256: e63bafe9c9c4ce6e9968dd567cd72a31e1902e328053c86ec28420ec1a68005b

Contents?: true

Size: 1.19 KB

Versions: 14

Compression:

Stored size: 1.19 KB

Contents

# frozen_string_literal: true

require 'phobos/processor'

module Phobos
  module Actions
    class ProcessMessage
      include Phobos::Processor

      attr_reader :metadata

      def initialize(listener:, message:, listener_metadata:)
        @listener = listener
        @message = message
        @metadata = listener_metadata.merge(
          key: message.key,
          partition: message.partition,
          offset: message.offset,
          retry_count: 0,
          headers: message.headers
        )
      end

      def execute
        payload = force_encoding(@message.value)

        begin
          process_message(payload)
        rescue StandardError => e
          handle_error(e, 'listener.retry_handler_error',
                       "error processing message, waiting #{backoff_interval}s")
          retry
        end
      end

      private

      def process_message(payload)
        instrument('listener.process_message', @metadata) do
          handler = @listener.handler_class.new

          handler.around_consume(payload, @metadata) do |around_payload, around_metadata|
            handler.consume(around_payload, around_metadata)
          end
        end
      end
    end
  end
end

Version data entries

14 entries across 14 versions & 2 rubygems

Version Path
phobos-2.1.6 lib/phobos/actions/process_message.rb
phobos-2.1.5 lib/phobos/actions/process_message.rb
phobos-2.1.4 lib/phobos/actions/process_message.rb
phobos-2.1.3 lib/phobos/actions/process_message.rb
phobos-2.1.2 lib/phobos/actions/process_message.rb
phobos-2.1.1 lib/phobos/actions/process_message.rb
phobos_temp_fork-0.0.4 lib/phobos/actions/process_message.rb
phobos_temp_fork-0.0.3 lib/phobos/actions/process_message.rb
phobos_temp_fork-0.0.2 lib/phobos/actions/process_message.rb
phobos_temp_fork-0.0.1 lib/phobos/actions/process_message.rb
phobos-2.1.0 lib/phobos/actions/process_message.rb
phobos-2.0.2 lib/phobos/actions/process_message.rb
phobos-2.0.1 lib/phobos/actions/process_message.rb
phobos-2.0.0.pre.beta1 lib/phobos/actions/process_message.rb