Sha256: f0b16dfc78c31dde00016308c47b1be4212f1a1ccb24e5b0586df4bf6b499884
Contents?: true
Size: 1.52 KB
Versions: 1
Compression:
Stored size: 1.52 KB
Contents
module HotBunnies import com.rabbitmq.client.DefaultConsumer class BaseConsumer < DefaultConsumer attr_accessor :consumer_tag def initialize(channel) super(channel) @channel = channel @cancelling = JavaConcurrent::AtomicBoolean.new @cancelled = JavaConcurrent::AtomicBoolean.new @terminated = JavaConcurrent::AtomicBoolean.new end def handleDelivery(consumer_tag, envelope, properties, body) body = String.from_java_bytes(body) headers = Headers.new(channel, consumer_tag, envelope, properties) deliver(headers, body) end def handleCancel(consumer_tag) @cancelled.set(true) @channel.unregister_consumer(consumer_tag) if f = @opts[:on_cancellation] case f.arity when 0 then f.call when 1 then f.call(self) when 2 then f.call(@channel, self) when 3 then f.call(@channel, self, consumer_tag) else f.call(@channel, self, consumer_tag) end end @terminated.set(true) end def handleCancelOk(consumer_tag) @cancelled.set(true) @channel.unregister_consumer(consumer_tag) @terminated.set(true) end def start end def deliver(headers, message) raise NotImplementedError, 'To be implemented by a subclass' end def cancelled? @cancelling.get || @cancelled.get end def active? !terminated? end def terminated? @terminated.get end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
hot_bunnies-2.0.0.pre9-java | lib/hot_bunnies/consumers/base.rb |