Sha256: b034622bcce739d95923e62590fc51f7756d0f7807636fe51d3771d2a941d0b9

Contents?: true

Size: 1.8 KB

Versions: 3

Compression:

Stored size: 1.8 KB

Contents

require 'rubygems'
require 'SecureRandom'
require 'eventmachine'
require 'amqp'

module Factor
  module Runtime
    class MessageBus
      attr_accessor :host, :vhost, :username, :token, :connection, :channel, :exchange, :queue
    
      def initialize(email,token)
        @host = "queue.factor.io"
        @vhost = email
        @username=email
        @token=token
      end
      
    
      # Creates the connection and creates a topic exchange
      # An exchange references a place to send messages to
      # the exchange routes it to the queues based on the route_key
      def start(topic="workflow",&code)
        EventMachine.run do
          #connection_settings={:host=>@host,:user=>@username,:password=>@token,:vhost=>@vhost}
          connection_settings={:host=>@host}
          @connection = AMQP.connect(connection_settings)
          @channel  = AMQP::Channel.new(connection)
          @exchange = @channel.topic(topic,:auto_delete=>true) # new topic exchange
          code.call
        end
      end
    
      # creates a new queue to listen to the topic exchange
      def listen(routing_key="#",&code)
        queue_name=SecureRandom.hex
        @queue = @channel.queue(queue_name)
        @queue.bind(@exchange, :routing_key=>routing_key) # bind queue to the Exchange
      
        @queue.subscribe do |headers,payload|
          message = Message.new
          message.from_queue headers.routing_key, payload
          code.call(message)
        end
      end
    
      def send(message)
        @exchange.publish(message.payload,:routing_key => message.route)
      end
    
      def send_and_close(message)
        send(message)
      
        EM.add_timer(1, Proc.new { close})
      
      end
    
      def close
        @connection.close{ EventMachine.stop }
      
      end
    
    end
  
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
factor-0.0.82 lib/runtime/message_bus.rb
factor-0.0.81 lib/runtime/message_bus.rb
factor-0.0.8 lib/runtime/message_bus.rb