Sha256: fd563860fb4cbe482558809f9f29f2bd9ee12674e00f72ea6da1c04864c88fa3

Contents?: true

Size: 1.93 KB

Versions: 2

Compression:

Stored size: 1.93 KB

Contents

#require 'thread'
module Rack
  module AMQP
    module Client
      class Request

        SLEEP_INCREMENT = 

        attr_accessor :routing_key, :request_path, :body, :request_id, :callback_queue, :http_method

        def initialize(request_id, http_method, uri, body, headers={})
          @callback_queue             = callback_queue
          @request_id                 = request_id
          @http_method                = http_method
          @headers                    = headers
          @body                       = body
          @routing_key, @request_path = split_uri uri
          @response                   = nil
          @mutex                      = Mutex.new
          @resource                   = ConditionVariable.new
        end

        def reply_wait(timeout)
          @mutex.synchronize do
            @resource.wait(@mutex, timeout)
          end
          resp = @response
          @reponse = nil
          resp
        end

        def callback(delivery_info, meta, payload)
          @mutex.synchronize do
            @resource.signal
            @response = Response.new(meta, payload, delivery_info)
          end
        end


        def publishing_options
          {
            mandatory: true, # receive an error on routing error
            message_id: request_id,
            reply_to: callback_queue.name,
            type: http_method,
            app_id: user_agent,
            timestamp: Time.now.to_i,
            headers: headers,
            routing_key: routing_key
          }
        end

        def payload
          body
        end

        def headers
          @headers.merge(path: request_path)
        end

        def user_agent
          "rack-amqp-client-#{VERSION}"
        end

        def split_uri(uri)
          # expecting target.queue.name/request/path?params=things&others=stuff
          parts = uri.split('/', 2)
          [parts[0].to_s, "/#{parts[1].to_s}"]
        end

      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rack-amqp-client-0.0.3 lib/rack/amqp/client/request.rb
rack-amqp-client-0.0.2 lib/rack/amqp/client/request.rb