lib/rabbitmq_client.rb in jerryluk-rabbitmq-jruby-client-0.1.6 vs lib/rabbitmq_client.rb in jerryluk-rabbitmq-jruby-client-0.1.7
- old
+ new
@@ -10,11 +10,13 @@
include_class('com.rabbitmq.client.ConnectionParameters')
include_class('com.rabbitmq.client.ConnectionFactory')
include_class('com.rabbitmq.client.Channel')
include_class('com.rabbitmq.client.Consumer')
include_class('com.rabbitmq.client.DefaultConsumer')
+ include_class('com.rabbitmq.client.QueueingConsumer')
include_class('com.rabbitmq.client.MessageProperties')
+ include_class('java.lang.InterruptedException')
include_class('java.lang.String') { |package, name| "J#{name}" }
class RabbitMQClientError < StandardError;end
class QueueConsumer < DefaultConsumer
@@ -84,9 +86,25 @@
end
def subscribe(&block)
no_ack = false
@channel.basic_consume(@name, no_ack, QueueConsumer.new(@channel, block))
+ end
+
+ def loop_subscribe(&block)
+ no_ack = false
+ consumer = QueueingConsumer.new(@channel)
+ @channel.basic_consume(@name, no_ack, consumer)
+ loop do
+ begin
+ delivery = consumer.next_delivery
+ message_body = Marshal.load(String.from_java_bytes(delivery.get_body))
+ block.call message_body
+ @channel.basic_ack(delivery.get_envelope.get_delivery_tag, false)
+ rescue InterruptedException => ie
+ next
+ end
+ end
end
protected
def auto_bind
unless @exchange