module Hat class Worker include Hat::Configurable attr_accessor :id, :logger has_configuration :worker, { middlewares: [], queue: { name: 'hat.direct' }.merge(config.queue) } class << self def queue(name) config.worker.queue.name = name end end def initialize(options) @options = options @connection = Bunny.new end def stop @stop = true end def run logger.debug 'Starting rabbit connection' @connection.start logger.debug 'Creating channel' @channel = @connection.create_channel logger.info "Subscribing to #{config.worker.queue.name}" queue.subscribe do |delivery_info, properties, payload| perform(*call(delivery_info, properties, payload)) end end def call(delivery_info, properties, payload) config.worker.middlwares.each { |m| delivery_info, properties, payload = m.call(delivery_info, properties, payload) } end private def exchange @exchange ||= @channel.direct('hat.direct') end def queue @queue ||= @channel.queue(config.worker.queue.name).bind(exchange, config.worker.queue.bind_options || {}) end end end