Sha256: 78c0d0bff2dd1f5279fb9608675c6c6a1ae899cab9b6a2168d1858d68ab67ab0

Contents?: true

Size: 1.29 KB

Versions: 6

Compression:

Stored size: 1.29 KB

Contents

require 'concurrent/cached_thread_pool'

module Msgr

  # The Dispatcher receives incoming messages,
  # process them through a middleware stack and
  # delegate them to a new and fresh consumer instance.
  #
  class Dispatcher
    include Logging

    attr_reader :pool

    def initialize(config)
      log(:info) { "Initialize new dispatcher (#{config[:max]} threads)..." }

      @pool = ::Concurrent::CachedThreadPool.new(max: config[:max])
    end

    def call(message)
      pool.post(message) do |message|
        dispatch message
      end
    end

    def dispatch(message)
      consumer_class = Object.const_get message.route.consumer

      log(:debug) { "Dispatch message to #{consumer_class.name}" }

      consumer_class.new.dispatch message

      # Acknowledge message unless it is already acknowledged.
      message.ack unless message.acked?
    rescue => error
      log(:error) do
        "Dispatcher error: #{error.class.name}: #{error}\n" +
            error.backtrace.join("\n")
      end
    ensure
      if defined?(ActiveRecord) && ActiveRecord::Base.active_connection?
        log(:debug) { 'Release used AR connection for dispatcher thread.' }
        ActiveRecord::Base.release_connection
      end
    end

    def shutdown

    end

    def to_s
      self.class.name
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
msgr-0.10.1 lib/msgr/dispatcher.rb
msgr-0.10.0 lib/msgr/dispatcher.rb
msgr-0.9.0 lib/msgr/dispatcher.rb
msgr-0.8.0 lib/msgr/dispatcher.rb
msgr-0.7.0 lib/msgr/dispatcher.rb
msgr-0.6.0 lib/msgr/dispatcher.rb