Sha256: 262b845fc78327e235774e5160b7ba46d66370fb53e6f77c8c51811e43135f1a

Contents?: true

Size: 1.86 KB

Versions: 1

Compression:

Stored size: 1.86 KB

Contents

require "hot_bunnies/consumers/base"

module HotBunnies
  class BlockingCallbackConsumer < CallbackConsumer
    include JavaConcurrent

    POISON = :__poison__

    def initialize(channel, buffer_size, opts, callback)
      super(channel, callback)
      if buffer_size
        @internal_queue = ArrayBlockingQueue.new(buffer_size)
      else
        @internal_queue = LinkedBlockingQueue.new
      end

      @opts = opts
    end

    def cancel
      @cancelling.set(true)
      response = channel.basic_cancel(consumer_tag)
      @cancelled.set(true)

      @internal_queue.offer(POISON)
      @terminated.set(true)

      response
    end

    def start
      interrupted = false
      until (@cancelling.get || @cancelled.get) || JavaConcurrent::Thread.current_thread.interrupted?
        begin
          pair = @internal_queue.take
          if pair
            if pair == POISON
              @cancelling.set(true)
            else
              callback(*pair)
            end
          end
        rescue InterruptedException => e
          interrupted = true
        end
      end
      while (pair = @internal_queue.poll)
        if pair
          if pair == POISON
            @cancelling.set(true)
          else
            callback(*pair)
          end
        end
      end
      @terminated.set(true)
      if interrupted
        JavaConcurrent::Thread.current_thread.interrupt
      end
    end

    def deliver(*pair)
      if (@cancelling.get || @cancelled.get) || JavaConcurrent::Thread.current_thread.interrupted?
        @internal_queue.offer(pair)
      else
        begin
          @internal_queue.put(pair)
        rescue InterruptedException => e
          JavaConcurrent::Thread.current_thread.interrupt
        end
      end
    end

    def gracefully_shut_down
      @cancelling.set(true)
      @internal_queue.offer(POISON)

      @terminated.set(true)
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
hot_bunnies-2.0.0.pre9-java lib/hot_bunnies/consumers/blocking.rb