lib/rabbitmq_client.rb in jerryluk-rabbitmq-jruby-client-0.1.4 vs lib/rabbitmq_client.rb in jerryluk-rabbitmq-jruby-client-0.1.5
- old
+ new
@@ -10,49 +10,67 @@
include_class('com.rabbitmq.client.ConnectionParameters')
include_class('com.rabbitmq.client.ConnectionFactory')
include_class('com.rabbitmq.client.Channel')
include_class('com.rabbitmq.client.Consumer')
include_class('com.rabbitmq.client.DefaultConsumer')
+ include_class('com.rabbitmq.client.MessageProperties')
include_class('java.lang.String') { |package, name| "J#{name}" }
+ class RabbitMQClientError < StandardError;end
+
class QueueConsumer < DefaultConsumer
def initialize(channel, block)
@channel = channel
@block = block
super(channel)
end
def handleDelivery(consumer_tag, envelope, properties, body)
delivery_tag = envelope.get_delivery_tag
message_body = Marshal.load(String.from_java_bytes(body))
+ # TODO: Do we need to do something with properties?
@block.call message_body
@channel.basic_ack(delivery_tag, false)
end
end
class Queue
- def initialize(name, channel)
+ def initialize(name, channel, durable=false)
@name = name
+ @durable = durable
@channel = channel
- @channel.queue_declare(name)
+ @channel.queue_declare(name, durable)
self
end
- def bind(exchange, routing_key=nil)
- @routing_key = routing_key || "#{@name}_#{Time.now.to_i.to_s}_#{rand(1<<64).to_s}"
+ def bind(exchange, routing_key='')
+ @routing_key = routing_key
@exchange = exchange
+ raise RabbitMQClientError, "queue and exchange has different durable property" unless @durable == exchange.durable
@channel.queue_bind(@name, @exchange.name, @routing_key)
self
end
+ # Set props for different type of message. Currently they are:
+ # RabbitMQClient::MessageProperties::MINIMAL_BASIC
+ # RabbitMQClient::MessageProperties::MINIMAL_PERSISTENT_BASIC
+ # RabbitMQClient::MessageProperties::BASIC
+ # RabbitMQClient::MessageProperties::PERSISTENT_BASIC
+ # RabbitMQClient::MessageProperties::TEXT_PLAIN
+ # RabbitMQClient::MessageProperties::PERSISTENT_TEXT_PLAIN
def publish(message_body, props=nil)
auto_bind
message_body_byte = Marshal.dump(message_body).to_java_bytes
@channel.basic_publish(@exchange.name, @routing_key, props, message_body_byte)
message_body
end
+ def persistent_publish(message_body, props=MessageProperties::PERSISTENT_BASIC)
+ raise RabbitMQClientError, "can only publish persistent message to durable queue" unless @durable
+ publish(message_body, props)
+ end
+
def retrieve
auto_bind
message_body = nil
no_ack = false
response = @channel.basic_get(@name, no_ack)
@@ -71,24 +89,26 @@
end
protected
def auto_bind
unless @exchange
- exchange = Exchange.new("#{@name}_exchange", 'fanout', @channel)
+ exchange = Exchange.new("#{@name}_exchange", 'fanout', @channel, @durable)
self.bind(exchange)
end
end
end
class Exchange
attr_reader :name
+ attr_reader :durable
- def initialize(name, type, channel)
+ def initialize(name, type, channel, durable=false)
@name = name
@type = type
+ @durable = durable
@channel = channel
- @channel.exchange_declare(@name, type.to_s)
+ @channel.exchange_declare(@name, type.to_s, durable)
self
end
end
# Class Methods
@@ -97,11 +117,11 @@
attr_reader :channel
attr_reader :connection
# Instance Methods
- def initialize(options={})
+ def initialize(options={:auto_connect => true})
# server address
@host = options[:host] || '127.0.0.1'
@port = options[:port] || 5672
# login details
@@ -111,11 +131,11 @@
# queues and exchanges
@queues = {}
@exchanges = {}
- connect
+ connect if options[:auto_connect]
# Disconnect before the object is destroyed
define_finalizer(self, lambda {|id| self.disconnect if self.connected? })
self
end
@@ -138,14 +158,14 @@
def connected?
@connection != nil
end
- def queue(name)
- @queues[name] ||= Queue.new(name, @channel)
+ def queue(name, durable=false)
+ @queues[name] ||= Queue.new(name, @channel, durable)
end
- def exchange(name, type='fanout')
- @exchanges[name] ||= Exchange.new(name, type, @channel)
+ def exchange(name, type='fanout', durable=false)
+ @exchanges[name] ||= Exchange.new(name, type, @channel, durable)
end
end