Sha256: 83e4f47d71457316954702130f651a24477ba84f8bb9a28dde2a17c97ff02963

Contents?: true

Size: 1.82 KB

Versions: 1

Compression:

Stored size: 1.82 KB

Contents

module Push
  # Backends for test and development environments
  module Backend
    class AMQP
      include Push::Logging

      def publish(message, exchange)
        EM.reactor_running? ? async_publish(message, exchange) : sync_publish(message, exchange)
        logger.debug "Published '#{message}' to exchange #{exchange}"
      end

      # Deal with running AMQP inside of a sync environment. This is useful
      # for script/console testing and our pe_result_processor.
      def sync_publish(message, exchange)
        Bunny.run(Push.config.amqp.to_hash) { |channel| 
          channel.exchange(exchange, :type => :fanout).publish(message)
        }
      end

      # Run this puppy inside of Em.
      def async_publish(message, exchange)
        channel = ::AMQP::Channel.new(connection)
        channel.fanout(exchange).publish(message)
        EM.next_tick {
          channel.close
          logger.debug "Channel closed"
        }
      end

      # Make sure we setup and use one connection per Push instance. AMQP 
      # wants to minimize connects/reconnects, so we connect once here and 
      # let publish use this connection.
      def connection
        @connection ||= ::AMQP.connect(Push.config.amqp.to_hash)
      end
    end

    class Test
      include Push::Logging
      
      def publish(message, exchange_name)
        logger.debug "Published '#{message}' to exchange #{exchange}"
        exchange[exchange_name] << message
      end

      def exchange
        @exchange ||= Hash.new {|h,k| h[k] = []} # Default hash empty hash values with an array (instead of nil)
      end
    end

    def self.register_adapter(name, adapter)
      adapters[name.to_sym] = adapter
    end

    def self.adapter(name=Push.config.backend)
      adapters[name.to_sym].new
    end

    def self.adapters
      @adapters ||= {}
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
push-0.0.1 lib/push/backend.rb