lib/rabbitmq_client.rb in jerryluk-rabbitmq-jruby-client-0.1.6 vs lib/rabbitmq_client.rb in jerryluk-rabbitmq-jruby-client-0.1.7

- old
+ new

@@ -10,11 +10,13 @@ include_class('com.rabbitmq.client.ConnectionParameters') include_class('com.rabbitmq.client.ConnectionFactory') include_class('com.rabbitmq.client.Channel') include_class('com.rabbitmq.client.Consumer') include_class('com.rabbitmq.client.DefaultConsumer') + include_class('com.rabbitmq.client.QueueingConsumer') include_class('com.rabbitmq.client.MessageProperties') + include_class('java.lang.InterruptedException') include_class('java.lang.String') { |package, name| "J#{name}" } class RabbitMQClientError < StandardError;end class QueueConsumer < DefaultConsumer @@ -84,9 +86,25 @@ end def subscribe(&block) no_ack = false @channel.basic_consume(@name, no_ack, QueueConsumer.new(@channel, block)) + end + + def loop_subscribe(&block) + no_ack = false + consumer = QueueingConsumer.new(@channel) + @channel.basic_consume(@name, no_ack, consumer) + loop do + begin + delivery = consumer.next_delivery + message_body = Marshal.load(String.from_java_bytes(delivery.get_body)) + block.call message_body + @channel.basic_ack(delivery.get_envelope.get_delivery_tag, false) + rescue InterruptedException => ie + next + end + end end protected def auto_bind unless @exchange