require 'java' require File.dirname(__FILE__) + '/junit.jar' require File.dirname(__FILE__) + '/commons-cli-1.1.jar' require File.dirname(__FILE__) + '/commons-io-1.2.jar' require File.dirname(__FILE__) + '/rabbitmq-client.jar' class RabbitMQClient include ObjectSpace include_class('com.rabbitmq.client.Connection') include_class('com.rabbitmq.client.ConnectionParameters') include_class('com.rabbitmq.client.ConnectionFactory') include_class('com.rabbitmq.client.Channel') include_class('com.rabbitmq.client.Consumer') include_class('com.rabbitmq.client.DefaultConsumer') include_class('com.rabbitmq.client.QueueingConsumer') include_class('com.rabbitmq.client.MessageProperties') include_class('java.lang.InterruptedException') include_class('java.lang.String') { |package, name| "J#{name}" } class RabbitMQClientError < StandardError;end class QueueConsumer < DefaultConsumer def initialize(channel, block) @channel = channel @block = block super(channel) end def handleDelivery(consumer_tag, envelope, properties, body) delivery_tag = envelope.get_delivery_tag message_body = Marshal.load(String.from_java_bytes(body)) # TODO: Do we need to do something with properties? message_body @channel.basic_ack(delivery_tag, false) end end class Queue def initialize(name, channel, durable=false) @name = name @durable = durable @channel = channel @channel.queue_declare(name, durable) self end def bind(exchange, routing_key='') raise RabbitMQClientError, "queue and exchange has different durable property" unless @durable == exchange.durable @routing_key = routing_key @exchange = exchange @channel.queue_bind(@name,, @routing_key) self end def unbind @channel.queue_unbind(@name,, @routing_key) if @exchange end # Set props for different type of message. Currently they are: # RabbitMQClient::MessageProperties::MINIMAL_BASIC # RabbitMQClient::MessageProperties::MINIMAL_PERSISTENT_BASIC # RabbitMQClient::MessageProperties::BASIC # RabbitMQClient::MessageProperties::PERSISTENT_BASIC # RabbitMQClient::MessageProperties::TEXT_PLAIN # RabbitMQClient::MessageProperties::PERSISTENT_TEXT_PLAIN def publish(message_body, props=nil) auto_bind message_body_byte = Marshal.dump(message_body).to_java_bytes @channel.basic_publish(, @routing_key, props, message_body_byte) message_body end def persistent_publish(message_body, props=MessageProperties::PERSISTENT_TEXT_PLAIN) raise RabbitMQClientError, "can only publish persistent message to durable queue" unless @durable publish(message_body, props) end def retrieve auto_bind message_body = nil no_ack = false response = @channel.basic_get(@name, no_ack) if response props = response.get_props message_body = Marshal.load(String.from_java_bytes(response.get_body)) delivery_tag = response.get_envelope.get_delivery_tag @channel.basic_ack(delivery_tag, false) end message_body end def subscribe(&block) no_ack = false @channel.basic_consume(@name, no_ack,, block)) end def loop_subscribe(&block) no_ack = false consumer = @channel.basic_consume(@name, no_ack, consumer) loop do begin delivery = consumer.next_delivery message_body = Marshal.load(String.from_java_bytes(delivery.get_body)) message_body @channel.basic_ack(delivery.get_envelope.get_delivery_tag, false) rescue InterruptedException => ie next end end end def purge @channel.queue_purge(@name) end def delete @channel.queue_delete(@name) end protected def auto_bind unless @exchange exchange ="#{@name}_exchange", 'fanout', @channel, @durable) self.bind(exchange) end end end class Exchange attr_reader :name attr_reader :durable def initialize(name, type, channel, durable=false) @name = name @type = type @durable = durable @channel = channel # Declare a non-passive, auto-delete exchange @channel.exchange_declare(@name, type.to_s, false, durable, true, nil) self end end # Class Methods class << self end attr_reader :channel attr_reader :connection # Instance Methods def initialize(options={}) # server address @host = options[:host] || '' @port = options[:port] || 5672 # login details @username = options[:username] || 'guest' @password = options[:password] || 'guest' @vhost = options[:vhost] || '/' # queues and exchanges @queues = {} @exchanges = {} connect unless options[:no_auto_connect] # Disconnect before the object is destroyed define_finalizer(self, lambda {|id| self.disconnect if self.connected? }) self end def connect params = params.set_username(@username) params.set_password(@password) params.set_virtual_host(@vhost) params.set_requested_heartbeat(0) conn_factory = @connection = conn_factory.new_connection(@host, @port) @channel = @connection.create_channel end def disconnect @queues.values.each { |q| q.unbind } @channel.close @connection.close @connection = nil end def connected? @connection != nil end def queue(name, durable=false) @queues[name] ||=, @channel, durable) end def exchange(name, type='fanout', durable=false) @exchanges[name] ||=, type, @channel, durable) end end