Sha256: e43506eedbf7f0dbc770995212d7d866da30329de6cda9820b4a89a4e2ab73f8

Contents?: true

Size: 1.99 KB

Versions: 1

Compression:

Stored size: 1.99 KB

Contents

module Rack
  module AMQP
    module Client
      class Manager
        attr_accessor :connection_options, :amqp_channel, :amqp_client

        def initialize(broker_connection_options)
          connect!(broker_connection_options)
          @correlation_id = 0
          @incomplete_requests = []
          @mutex = Mutex.new
        end

        # TODO this method needs to be refactored
        def request(uri, options={})
          http_method = options[:http_method]
          timeout = options[:timeout] || 5

          body = options[:body] || ""
          headers = {
            'Content-Type' => 'application/x-www-form-urlencoded',
            'Content-Length' => body.length
          }.merge(options[:headers])

          request = Request.new((@correlation_id += 1).to_s, http_method, uri, body, headers)
          @mutex.synchronize { @incomplete_requests << request }
          callback_queue = create_callback_queue
          request.callback_queue = callback_queue

          amqp_channel.direct('').publish(request.payload, request.publishing_options)

          response = request.reply_wait(timeout)
          response
        end

        private

        def create_callback_queue
          @callback_queue ||= begin
          # build queue
          queue = amqp_channel.queue("", auto_delete: true, exclusive: true)
          queue.subscribe do |di, meta, payload|
            request = nil
            @mutex.synchronize do 
              request = @incomplete_requests.detect do |r|
                r.request_id == meta[:correlation_id]
              end
              @incomplete_requests.delete(request)
            end
            request.callback(di, meta, payload)
          end
          # bind to an exchange, maybe later
          queue
                              end
        end

        def connect!(broker_options)
          self.amqp_client = Bunny.new(broker_options)
          amqp_client.start
          self.amqp_channel = amqp_client.create_channel
        end

      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rack-amqp-client-0.0.2 lib/rack/amqp/client/manager.rb