Sha256: 9afeb7a11cb6e7ab722af4f1cd7545df2294240f62d150ecfb7156f4194b3d85

Contents?: true

Size: 1.64 KB

Versions: 2

Compression:

Stored size: 1.64 KB

Contents

module HornetQ::Client
  # Create a Server for receiving requests and replying
  #   to arbitrary queues
  # Create an instance of this class per thread
  class Server
    def initialize(session, request_queue, timeout)
      @session = session
      @consumer = session.create_consumer(request_queue)
      @producer = session.create_producer
      @timeout = timeout
    end
  
    def run(&block)
      while request_message = @consumer.receive(@timeout)
        # Block should return a message reply object, pass in request
        # TODO: ensure..
        reply_message = block.call(request_message)
      
        # Send a reply?
        reply(request_message, reply_message) if request_message.request?
        request_message.acknowledge
      end
    end

    # Send a reply to the received request message
    #   request: is the message received
    #   reply:   is the message to send to the client
    def reply(request_message, reply_message)
      if request_message.request?
        # Reply should have same durability as request
        reply_message.durable = request_message.durable?
        reply_to = request_message.getSimpleStringProperty(Java::OrgHornetqCoreClientImpl::ClientMessageImpl::REPLYTO_HEADER_NAME);
        # Send request message id back in reply message for correlation purposes
        reply_message.user_id = request_message.user_id
        @producer.send(reply_to, reply_message)
        #puts "Sent reply to #{reply_to.to_s}: #{reply_message.inspect}"
      end
      request_message.acknowledge
    end
  
    # Close out resources
    def close
      @consumer.close if @consumer
      @producer.close if @producer
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
jruby-hornetq-0.2.3.alpha lib/hornetq/client/server.rb
jruby-hornetq-0.2.1.alpha lib/hornetq/client/server.rb