Sha256: 826bbb02a40c0b96259039f43014f391c4c4ec20798c809416e010f92b040423

Contents?: true

Size: 1.79 KB

Versions: 79

Compression:

Stored size: 1.79 KB

Contents

# frozen_string_literal: true

module Msgr
  # The Dispatcher receives incoming messages,
  # process them through a middleware stack and
  # delegate them to a new and fresh consumer instance.
  #
  class Dispatcher
    include Logging

    attr_reader :config, :pool

    def initialize(config)
      config[:pool_class] ||= 'Msgr::Dispatcher::NullPool'

      log(:debug) do
        "Initialize new dispatcher (#{config[:pool_class]}: #{config})..."
      end

      @config = config
      @pool = config[:pool_class].constantize.new config
    end

    def call(message)
      pool.post(message) do |msg|
        dispatch msg
      end
    end

    # rubocop:disable Metrics/AbcSize
    # rubocop:disable Metrics/MethodLength
    # rubocop:disable Metrics/CyclomaticComplexity
    def dispatch(message)
      consumer_class = Object.const_get message.route.consumer

      log(:debug) { "Dispatch message to #{consumer_class.name}" }

      consumer_class.new.dispatch message

      # Acknowledge message unless it is already acknowledged or auto_ack is disabled.
      message.ack unless message.acked? or not consumer_class.auto_ack?
    rescue => error
      message.nack unless message.acked?

      log(:error) do
        "Dispatcher error: #{error.class.name}: #{error}\n" +
          error.backtrace.join("\n")
      end

      raise error if config[:raise_exceptions]
    ensure
      if defined?(ActiveRecord) &&
         ActiveRecord::Base.connection_pool.active_connection?
        log(:debug) { 'Release used AR connection for dispatcher thread.' }
        ActiveRecord::Base.connection_pool.release_connection
      end
    end

    def shutdown; end

    def to_s
      self.class.name
    end

    class NullPool
      def initialize(*); end

      def post(*args)
        yield(*args)
      end
    end
  end
end

Version data entries

79 entries across 79 versions & 1 rubygems

Version Path
msgr-1.2.0 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b306 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b305 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b302 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b301 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b300 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b297 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b296 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b295 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b292 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b291 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b288 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b285 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b263 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b249 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b248 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b244 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b241 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b240 lib/msgr/dispatcher.rb
msgr-1.1.0.1.b239 lib/msgr/dispatcher.rb