Sha256: f0b16dfc78c31dde00016308c47b1be4212f1a1ccb24e5b0586df4bf6b499884

Contents?: true

Size: 1.52 KB

Versions: 1

Compression:

Stored size: 1.52 KB

Contents

module HotBunnies
  import com.rabbitmq.client.DefaultConsumer

  class BaseConsumer < DefaultConsumer
    attr_accessor :consumer_tag

    def initialize(channel)
      super(channel)
      @channel    = channel

      @cancelling = JavaConcurrent::AtomicBoolean.new
      @cancelled  = JavaConcurrent::AtomicBoolean.new

      @terminated = JavaConcurrent::AtomicBoolean.new
    end

    def handleDelivery(consumer_tag, envelope, properties, body)
      body    = String.from_java_bytes(body)
      headers = Headers.new(channel, consumer_tag, envelope, properties)

      deliver(headers, body)
    end

    def handleCancel(consumer_tag)
      @cancelled.set(true)
      @channel.unregister_consumer(consumer_tag)

      if f = @opts[:on_cancellation]
        case f.arity
        when 0 then
          f.call
        when 1 then
          f.call(self)
        when 2 then
          f.call(@channel, self)
        when 3 then
          f.call(@channel, self, consumer_tag)
        else
          f.call(@channel, self, consumer_tag)
        end
      end

      @terminated.set(true)
    end

    def handleCancelOk(consumer_tag)
      @cancelled.set(true)
      @channel.unregister_consumer(consumer_tag)

      @terminated.set(true)
    end

    def start
    end

    def deliver(headers, message)
      raise NotImplementedError, 'To be implemented by a subclass'
    end

    def cancelled?
      @cancelling.get || @cancelled.get
    end

    def active?
      !terminated?
    end

    def terminated?
      @terminated.get
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
hot_bunnies-2.0.0.pre9-java lib/hot_bunnies/consumers/base.rb